``` ├── .github/ ├── workflows/ ├── publish.yml ├── .gitignore ├── .vscode/ ├── settings.json ├── LICENSE ├── README.md ├── index/ ├── __init__.py ├── agent/ ├── agent.py ├── demo_images/ ├── complex_layout_highlight.png ├── complex_layout_small_elements.png ├── loading.png ├── loading2.png ├── scroll.png ├── message_manager.py ├── models.py ├── prompts.py ├── utils.py ├── browser/ ├── browser.py ├── detector.py ├── findVisibleInteractiveElements.js ├── fonts/ ├── OpenSans-Medium.ttf ├── models.py ├── utils.py ├── cli.py ├── controller/ ├── controller.py ├── default_actions.py ├── llm/ ├── llm.py ├── providers/ ├── __init__.py ├── anthropic.py ├── anthropic_bedrock.py ├── gemini.py ├── openai.py ├── pyproject.toml ├── static/ ├── traces.png ├── uv.lock ``` ## /.github/workflows/publish.yml ```yml path="/.github/workflows/publish.yml" name: Publish Python Package on: push: tags: - 'v*' permissions: contents: read jobs: publish: runs-on: ubuntu-latest environment: name: pypi url: https://pypi.org/p/lmnr/ permissions: id-token: write steps: - uses: actions/checkout@v4 - name: Install uv uses: astral-sh/setup-uv@v4 - name: Set up Python uses: actions/setup-python@v5 with: python-version: '3.10' - name: Install the project run: uv sync --all-extras --dev - name: Verify tag matches package version run: | # Extract version from tag (remove 'v' prefix) TAG_VERSION=${GITHUB_REF#refs/tags/v} # Extract version from pyproject.toml PACKAGE_VERSION=$(grep -oP '(?<=version = ")[^"]+' pyproject.toml) echo "Tag version: $TAG_VERSION" echo "Package version: $PACKAGE_VERSION" # Check if versions match if [ "$TAG_VERSION" != "$PACKAGE_VERSION" ]; then echo "Error: Tag version ($TAG_VERSION) does not match package version ($PACKAGE_VERSION)" exit 1 fi - name: Build package run: uv build - name: Publish package uses: pypa/gh-action-pypi-publish@release/v1 ``` ## /.gitignore ```gitignore path="/.gitignore" .DS_Store # Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] *$py.class # C extensions *.so # Distribution / packaging .Python build/ develop-eggs/ dist/ downloads/ eggs/ .eggs/ lib/ lib64/ parts/ sdist/ var/ wheels/ share/python-wheels/ *.egg-info/ .installed.cfg *.egg MANIFEST # PyInstaller # Usually these files are written by a python script from a template # before PyInstaller builds the exe, so as to inject date/other infos into it. *.manifest *.spec # Installer logs pip-log.txt pip-delete-this-directory.txt # Unit test / coverage reports htmlcov/ .tox/ .nox/ .coverage .coverage.* .cache nosetests.xml coverage.xml *.cover *.py,cover .hypothesis/ .pytest_cache/ cover/ # Translations *.mo *.pot # Django stuff: *.log local_settings.py db.sqlite3 db.sqlite3-journal # Flask stuff: instance/ .webassets-cache # Scrapy stuff: .scrapy # Sphinx documentation docs/_build/ # PyBuilder .pybuilder/ target/ # Jupyter Notebook .ipynb_checkpoints # IPython profile_default/ ipython_config.py # pyenv # For a library or package, you might want to ignore these files since the code is # intended to run in multiple environments; otherwise, check them in: # .python-version # pipenv # According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. # However, in case of collaboration, if having platform-specific dependencies or dependencies # having no cross-platform support, pipenv may install dependencies that don't work, or not # install all needed dependencies. #Pipfile.lock # poetry # Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. # This is especially recommended for binary packages to ensure reproducibility, and is more # commonly ignored for libraries. # https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control #poetry.lock # pdm # Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. #pdm.lock # pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it # in version control. # https://pdm.fming.dev/latest/usage/project/#working-with-version-control .pdm.toml .pdm-python .pdm-build/ # PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm __pypackages__/ # Celery stuff celerybeat-schedule celerybeat.pid # SageMath parsed files *.sage.py # Environments .env .venv env/ venv/ ENV/ env.bak/ venv.bak/ # Spyder project settings .spyderproject .spyproject # Rope project settings .ropeproject # mkdocs documentation /site # mypy .mypy_cache/ .dmypy.json dmypy.json # Pyre type checker .pyre/ # pytype static type analyzer .pytype/ # Cython debug symbols cython_debug/ # PyCharm # JetBrains specific template is maintained in a separate JetBrains.gitignore that can # be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ ``` ## /.vscode/settings.json ```json path="/.vscode/settings.json" { "[python]": { "editor.codeActionsOnSave": { "source.fixAll": "explicit", "source.organizeImports": "explicit" }, "editor.defaultFormatter": "charliermarsh.ruff" } } ``` ## /LICENSE ``` path="/LICENSE" Apache License Version 2.0, January 2004 http://www.apache.org/licenses/ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION 1. Definitions. "License" shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document. "Licensor" shall mean the copyright owner or entity authorized by the copyright owner that is granting the License. "Legal Entity" shall mean the union of the acting entity and all other entities that control, are controlled by, or are under common control with that entity. For the purposes of this definition, "control" means (i) the power, direct or indirect, to cause the direction or management of such entity, whether by contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the outstanding shares, or (iii) beneficial ownership of such entity. "You" (or "Your") shall mean an individual or Legal Entity exercising permissions granted by this License. "Source" form shall mean the preferred form for making modifications, including but not limited to software source code, documentation source, and configuration files. "Object" form shall mean any form resulting from mechanical transformation or translation of a Source form, including but not limited to compiled object code, generated documentation, and conversions to other media types. "Work" shall mean the work of authorship, whether in Source or Object form, made available under the License, as indicated by a copyright notice that is included in or attached to the work (an example is provided in the Appendix below). "Derivative Works" shall mean any work, whether in Source or Object form, that is based on (or derived from) the Work and for which the editorial revisions, annotations, elaborations, or other modifications represent, as a whole, an original work of authorship. For the purposes of this License, Derivative Works shall not include works that remain separable from, or merely link (or bind by name) to the interfaces of, the Work and Derivative Works thereof. "Contribution" shall mean any work of authorship, including the original version of the Work and any modifications or additions to that Work or Derivative Works thereof, that is intentionally submitted to Licensor for inclusion in the Work by the copyright owner or by an individual or Legal Entity authorized to submit on behalf of the copyright owner. For the purposes of this definition, "submitted" means any form of electronic, verbal, or written communication sent to the Licensor or its representatives, including but not limited to communication on electronic mailing lists, source code control systems, and issue tracking systems that are managed by, or on behalf of, the Licensor for the purpose of discussing and improving the Work, but excluding communication that is conspicuously marked or otherwise designated in writing by the copyright owner as "Not a Contribution." "Contributor" shall mean Licensor and any individual or Legal Entity on behalf of whom a Contribution has been received by Licensor and subsequently incorporated within the Work. 2. Grant of Copyright License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable copyright license to reproduce, prepare Derivative Works of, publicly display, publicly perform, sublicense, and distribute the Work and such Derivative Works in Source or Object form. 3. Grant of Patent License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable (except as stated in this section) patent license to make, have made, use, offer to sell, sell, import, and otherwise transfer the Work, where such license applies only to those patent claims licensable by such Contributor that are necessarily infringed by their Contribution(s) alone or by combination of their Contribution(s) with the Work to which such Contribution(s) was submitted. If You institute patent litigation against any entity (including a cross-claim or counterclaim in a lawsuit) alleging that the Work or a Contribution incorporated within the Work constitutes direct or contributory patent infringement, then any patent licenses granted to You under this License for that Work shall terminate as of the date such litigation is filed. 4. Redistribution. You may reproduce and distribute copies of the Work or Derivative Works thereof in any medium, with or without modifications, and in Source or Object form, provided that You meet the following conditions: (a) You must give any other recipients of the Work or Derivative Works a copy of this License; and (b) You must cause any modified files to carry prominent notices stating that You changed the files; and (c) You must retain, in the Source form of any Derivative Works that You distribute, all copyright, patent, trademark, and attribution notices from the Source form of the Work, excluding those notices that do not pertain to any part of the Derivative Works; and (d) If the Work includes a "NOTICE" text file as part of its distribution, then any Derivative Works that You distribute must include a readable copy of the attribution notices contained within such NOTICE file, excluding those notices that do not pertain to any part of the Derivative Works, in at least one of the following places: within a NOTICE text file distributed as part of the Derivative Works; within the Source form or documentation, if provided along with the Derivative Works; or, within a display generated by the Derivative Works, if and wherever such third-party notices normally appear. The contents of the NOTICE file are for informational purposes only and do not modify the License. You may add Your own attribution notices within Derivative Works that You distribute, alongside or as an addendum to the NOTICE text from the Work, provided that such additional attribution notices cannot be construed as modifying the License. You may add Your own copyright statement to Your modifications and may provide additional or different license terms and conditions for use, reproduction, or distribution of Your modifications, or for any such Derivative Works as a whole, provided Your use, reproduction, and distribution of the Work otherwise complies with the conditions stated in this License. 5. Submission of Contributions. Unless You explicitly state otherwise, any Contribution intentionally submitted for inclusion in the Work by You to the Licensor shall be under the terms and conditions of this License, without any additional terms or conditions. Notwithstanding the above, nothing herein shall supersede or modify the terms of any separate license agreement you may have executed with Licensor regarding such Contributions. 6. Trademarks. This License does not grant permission to use the trade names, trademarks, service marks, or product names of the Licensor, except as required for reasonable and customary use in describing the origin of the Work and reproducing the content of the NOTICE file. 7. Disclaimer of Warranty. Unless required by applicable law or agreed to in writing, Licensor provides the Work (and each Contributor provides its Contributions) on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without limitation, any warranties or conditions of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are solely responsible for determining the appropriateness of using or redistributing the Work and assume any risks associated with Your exercise of permissions under this License. 8. Limitation of Liability. In no event and under no legal theory, whether in tort (including negligence), contract, or otherwise, unless required by applicable law (such as deliberate and grossly negligent acts) or agreed to in writing, shall any Contributor be liable to You for damages, including any direct, indirect, special, incidental, or consequential damages of any character arising as a result of this License or out of the use or inability to use the Work (including but not limited to damages for loss of goodwill, work stoppage, computer failure or malfunction, or any and all other commercial damages or losses), even if such Contributor has been advised of the possibility of such damages. 9. Accepting Warranty or Additional Liability. While redistributing the Work or Derivative Works thereof, You may choose to offer, and charge a fee for, acceptance of support, warranty, indemnity, or other liability obligations and/or rights consistent with this License. However, in accepting such obligations, You may act only on Your own behalf and on Your sole responsibility, not on behalf of any other Contributor, and only if You agree to indemnify, defend, and hold each Contributor harmless for any liability incurred by, or claims asserted against, such Contributor by reason of your accepting any such warranty or additional liability. END OF TERMS AND CONDITIONS APPENDIX: How to apply the Apache License to your work. To apply the Apache License to your work, attach the following boilerplate notice, with the fields enclosed by brackets "[]" replaced with your own identifying information. (Don't include the brackets!) The text should be enclosed in the appropriate comment syntax for the file format. We also recommend that a file or class name and description of purpose be included on the same "printed page" as the copyright notice for easier identification within third-party archives. Copyright [yyyy] [name of copyright owner] Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ``` ## /README.md ![Static Badge](https://img.shields.io/badge/Y%20Combinator-S24-orange) ![X (formerly Twitter) Follow](https://img.shields.io/twitter/follow/lmnrai) ![Static Badge](https://img.shields.io/badge/Join_Discord-464646?&logo=discord&logoColor=5865F2) # Index Index is the SOTA open-source browser agent for autonomously executing complex tasks on the web. - [x] Powered by reasoning LLMs with vision capabilities. - [x] Gemini 2.5 Pro (really fast and accurate) - [x] Claude 3.7 Sonnet with extended thinking (reliable and accurate) - [x] OpenAI o4-mini (depending on the reasoning effort, provides good balance between speed, cost and accuracy) - [x] Gemini 2.5 Flash (really fast, cheap, and good for less complex tasks) - [x] `pip install lmnr-index` and use it in your project - [x] `index run` to run the agent in the interactive CLI - [x] Index is also available as a [serverless API.](https://docs.lmnr.ai/laminar-index/introduction) - [x] You can also try out Index via [Chat UI](https://lmnr.ai/chat). - [x] Supports advanced [browser agent observability](https://docs.lmnr.ai/laminar-index/observability) powered by open-source platform [Laminar](https://github.com/lmnr-ai/lmnr). prompt: go to ycombinator.com. summarize first 3 companies in the W25 batch and make new spreadsheet in google sheets. https://github.com/user-attachments/assets/2b46ee20-81b6-4188-92fb-4d97fe0b3d6a ## Documentation Check out full documentation [here](https://docs.lmnr.ai/index-agent/getting-started) ## Index API The easiest way to use Index in production is via the [serverless API](https://docs.lmnr.ai/laminar-index/introduction). Index API manages remote browser sessions, agent infrastructure and [browser observability](https://docs.lmnr.ai/laminar-index/tracing). To get started, [sign up](https://lmnr.ai/sign-in) and create project API key. Read the [docs](https://docs.lmnr.ai/laminar-index/introduction) to learn more. ### Install Laminar ```bash pip install lmnr ``` ### Use Index via API ```python from lmnr import Laminar, LaminarClient # you can also set LMNR_PROJECT_API_KEY environment variable # Initialize tracing Laminar.initialize(project_api_key="your_api_key") # Initialize the client client = LaminarClient(project_api_key="your_api_key") for chunk in client.agent.run( stream=True, model_provider="gemini", model="gemini-2.5-pro-preview-03-25", prompt="Navigate to news.ycombinator.com, find a post about AI, and summarize it" ): print(chunk) ``` ## Local Quick Start ### Install dependencies ```bash pip install lmnr-index # Install playwright playwright install chromium ``` ### Setup model API keys Setup your model API keys in `.env` file in your project root: ``` ANTHROPIC_API_KEY= GEMINI_API_KEY= OPENAI_API_KEY= ``` ### Run the agent with CLI You can run Index via interactive CLI. It features: - Browser state persistence between sessions - Follow-up messages with support for "give human control" action - Real-time streaming updates - Beautiful terminal UI using Textual You can run the agent with the following command. Remember to set API key for the selected model in the `.env` file. ```bash index run ``` Output will look like this: ``` Loaded existing browser state ╭───────────────────── Interactive Mode ─────────────────────╮ │ Index Browser Agent Interactive Mode │ │ Type your message and press Enter. The agent will respond. │ │ Press Ctrl+C to exit. │ ╰────────────────────────────────────────────────────────────╯ Choose an LLM model: 1. Gemini 2.5 Flash 2. Claude 3.7 Sonnet 3. OpenAI o4-mini Select model [1/2] (1): 3 Using OpenAI model: o4-mini Loaded existing browser state Your message: go to lmnr.ai, summarize pricing page Agent is working... Step 1: Opening lmnr.ai Step 2: Opening Pricing page Step 3: Scrolling for more pricing details Step 4: Scrolling back up to view pricing tiers Step 5: Provided concise summary of the three pricing tiers ``` ### Running with a personal Chrome instance You can use Index with personal Chrome browser instance instead of launching a new browser. Main advantage is that all existing logged in sessions will be available. ```bash # Basic usage with default Chrome path index run --local-chrome # With custom Chrome path and debugging port index run --local-chrome --chrome-path="/path/to/chrome" --port=9223 ``` This will launch Chrome with remote debugging enabled and connect Index to it. #### OS-specific Chrome paths Default Chrome executable paths on different operating systems: **macOS**: ```bash index run --local-chrome --chrome-path="/Applications/Google Chrome.app/Contents/MacOS/Google Chrome" ``` **Windows**: ```bash index run --local-chrome --chrome-path="C:\Program Files\Google\Chrome\Application\chrome.exe" ``` #### Connecting to an already running Chrome instance If you already have Chrome running with remote debugging enabled, you can connect to it: 1. Launch Chrome with debugging enabled: ```bash # macOS /Applications/Google\ Chrome.app/Contents/MacOS/Google\ Chrome --remote-debugging-port=9222 # Windows "C:\Program Files\Google\Chrome\Application\chrome.exe" --remote-debugging-port=9222 ``` 2. Then run Index with the same port: ```bash index run --local-chrome --port=9222 ``` ### Run the agent with code ```python import asyncio from index import Agent, AnthropicProvider async def main(): llm = AnthropicProvider( model="claude-3-7-sonnet-20250219", enable_thinking=True, thinking_token_budget=2048) # llm = OpenAIProvider(model="o4-mini") you can also use OpenAI models agent = Agent(llm=llm) output = await agent.run( prompt="Navigate to news.ycombinator.com, find a post about AI, and summarize it" ) print(output.result) if __name__ == "__main__": asyncio.run(main()) ``` ### Stream the agent's output ```python async for chunk in agent.run_stream( prompt="Navigate to news.ycombinator.com, find a post about AI, and summarize it" ): print(chunk) ``` ### Enable browser agent observability To trace Index agent's actions and record browser session you simply need to initialize Laminar tracing before running the agent. ```python from lmnr import Laminar Laminar.initialize(project_api_key="your_api_key") ``` Then you will get full observability on the agent's actions synced with the browser session in the Laminar platform. Index observability ### Run with remote CDP url ```python import asyncio from index import Agent, AnthropicProvider, BrowserConfig async def main(): # Configure browser to connect to an existing Chrome DevTools Protocol endpoint browser_config = BrowserConfig( cdp_url="" ) llm = AnthropicProvider(model="claude-3-7-sonnet-20250219", enable_thinking=True, thinking_token_budget=2048) agent = Agent(llm=llm, browser_config=browser_config) output = await agent.run( prompt="Navigate to news.ycombinator.com and find the top story" ) print(output.result) if __name__ == "__main__": asyncio.run(main()) ``` ### Run with local Chrome instance (programmatically) ```python import asyncio from index import Agent, AnthropicProvider, BrowserConfig async def main(): # Configure browser to connect to a local Chrome instance browser_config = BrowserConfig( cdp_url="http://localhost:9222" ) llm = AnthropicProvider(model="claude-3-7-sonnet-20250219", enable_thinking=True, thinking_token_budget=2048) agent = Agent(llm=llm, browser_config=browser_config) output = await agent.run( prompt="Navigate to news.ycombinator.com and find the top story" ) print(output.result) if __name__ == "__main__": asyncio.run(main()) ``` ### Customize browser window size ```python import asyncio from index import Agent, AnthropicProvider, BrowserConfig async def main(): # Configure browser with custom viewport size browser_config = BrowserConfig( viewport_size={"width": 1200, "height": 900} ) llm = AnthropicProvider(model="claude-3-7-sonnet-20250219") agent = Agent(llm=llm, browser_config=browser_config) output = await agent.run( "Navigate to a responsive website and capture how it looks in full HD resolution" ) print(output.result) if __name__ == "__main__": asyncio.run(main()) ``` --- Made with ❤️ by the [Laminar team](https://lmnr.ai) ## /index/__init__.py ```py path="/index/__init__.py" from index.agent.agent import Agent from index.agent.models import ActionModel, ActionResult, AgentOutput from index.browser.browser import Browser, BrowserConfig from index.browser.detector import Detector from index.browser.models import InteractiveElement from index.llm.providers.anthropic import AnthropicProvider from index.llm.providers.anthropic_bedrock import AnthropicBedrockProvider from index.llm.providers.gemini import GeminiProvider from index.llm.providers.openai import OpenAIProvider __all__ = [ 'Agent', 'Browser', 'BrowserConfig', 'ActionResult', 'ActionModel', 'AnthropicProvider', 'AnthropicBedrockProvider', 'OpenAIProvider', 'GeminiProvider', 'AgentOutput', 'Detector', 'InteractiveElement', ] ``` ## /index/agent/agent.py ```py path="/index/agent/agent.py" from __future__ import annotations import json import logging import re import time import uuid from typing import AsyncGenerator, Optional from dotenv import load_dotenv from lmnr import Laminar, LaminarSpanContext, observe, use_span from pydantic import ValidationError from index.agent.message_manager import MessageManager from index.agent.models import ( ActionResult, AgentLLMOutput, AgentOutput, AgentState, AgentStreamChunk, FinalOutputChunk, StepChunk, StepChunkContent, StepChunkError, TimeoutChunk, TimeoutChunkContent, ) from index.browser.browser import Browser, BrowserConfig from index.controller.controller import Controller from index.llm.llm import BaseLLMProvider, Message load_dotenv() logger = logging.getLogger(__name__) class Agent: def __init__( self, llm: BaseLLMProvider, browser_config: BrowserConfig | None = None ): self.llm = llm self.controller = Controller() # Initialize browser or use the provided one self.browser = Browser(config=browser_config if browser_config is not None else BrowserConfig()) action_descriptions = self.controller.get_action_descriptions() self.message_manager = MessageManager( action_descriptions=action_descriptions, ) self.state = AgentState( messages=[], ) async def step(self, step: int, previous_result: ActionResult | None = None, step_span_context: Optional[LaminarSpanContext] = None) -> tuple[ActionResult, str]: """Execute one step of the task""" with Laminar.start_as_current_span( name="agent.step", parent_span_context=step_span_context, input={ "step": step, }, ): state = await self.browser.update_state() if previous_result: self.message_manager.add_current_state_message(state, previous_result) input_messages = self.message_manager.get_messages() try: model_output = await self._generate_action(input_messages) except Exception as e: # model call failed, remove last state message from history before retrying self.message_manager.remove_last_message() raise e if previous_result: # we're removing the state message that we've just added because we want to append it in a different format self.message_manager.remove_last_message() self.message_manager.add_message_from_model_output(step, previous_result, model_output, state.screenshot) try: result: ActionResult = await self.controller.execute_action( model_output.action, self.browser ) if result.is_done: logger.info(f'Result: {result.content}') self.final_output = result.content return result, model_output.summary except Exception as e: raise e @observe(name='agent.generate_action', ignore_input=True) async def _generate_action(self, input_messages: list[Message]) -> AgentLLMOutput: """Get next action from LLM based on current state""" response = await self.llm.call(input_messages) # Extract content between tags using regex, including variations like pattern = r"]*)>(.*?)]*)>" match = re.search(pattern, response.content, re.DOTALL) json_str = "" if not match: # if we couldn't find the tags, it most likely means the tag is not present in the response # remove closing and opening tags just in case closing_tag_pattern = r"]*)>" json_str = re.sub(closing_tag_pattern, "", response.content).strip() open_tag_pattern = r"]*)>" json_str = re.sub(open_tag_pattern, "", json_str).strip() json_str = json_str.replace("\`\`\`json", "").replace("\`\`\`", "").strip() else: # Extract just the content between the tags without any additional replacement json_str = match.group(1).strip() try: # First try to parse it directly to catch any obvious JSON issues try: json.loads(json_str) except json.JSONDecodeError: # If direct parsing fails, attempt to fix common issues # Remove escape characters and control characters (0x00-0x1F) that might cause problems json_str = json_str.replace('\\n', '\n').replace('\\r', '\r').replace('\\t', '\t') # Clean all control characters (0x00-0x1F) except valid JSON whitespace (\n, \r, \t) json_str = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F]', '', json_str) output = AgentLLMOutput.model_validate_json(json_str.strip()) logger.info(f'💡 Thought: {output.thought}') logger.info(f'💡 Summary: {output.summary}') logger.info(f'🛠️ Action: {output.action.model_dump_json(exclude_unset=True)}') if response.thinking: output.thinking_block = response.thinking return output except ValidationError as e: raise ValueError(f"Could not parse response: {str(e)}\nResponse was: {json_str}") async def _setup_messages(self, prompt: str, agent_state: str | None = None, start_url: str | None = None): """Set up messages based on state dict or initialize with system message""" if agent_state: # assuming that the structure of the state.messages is correct state = AgentState.model_validate_json(agent_state) self.message_manager.set_messages(state.messages) # Update browser_context to browser browser_state = await self.browser.update_state() self.message_manager.add_current_state_message(browser_state, user_follow_up_message=prompt) else: self.message_manager.add_system_message_and_user_prompt(prompt) if start_url: await self.browser.goto(start_url) browser_state = await self.browser.update_state() self.message_manager.add_current_state_message(browser_state) async def run(self, prompt: str, max_steps: int = 100, agent_state: str | None = None, parent_span_context: Optional[LaminarSpanContext] = None, close_context: bool = True, session_id: str | None = None, return_agent_state: bool = False, return_storage_state: bool = False, start_url: str | None = None, ) -> AgentOutput: """Execute the task with maximum number of steps and return the final result Args: prompt: The prompt to execute the task with max_steps: The maximum number of steps to execute the task with. Defaults to 100. agent_state: Optional, the state of the agent to execute the task with parent_span_context: Optional, parent span context in Laminar format to execute the task with close_context: Whether to close the browser context after the task is executed session_id: Optional, Agent session id return_agent_state: Whether to return the agent state with the final output return_storage_state: Whether to return the storage state with the final output start_url: Optional, the URL to start the task with """ if prompt is None and agent_state is None: raise ValueError("Either prompt or agent_state must be provided") with Laminar.start_as_current_span( name="agent.run", parent_span_context=parent_span_context, input={ "prompt": prompt, "max_steps": max_steps, "stream": False, }, ) as span: if session_id is not None: span.set_attribute("lmnr.internal.agent_session_id", session_id) await self._setup_messages(prompt, agent_state, start_url) step = 0 result = None is_done = False trace_id = str(uuid.UUID(int=span.get_span_context().trace_id)) try: while not is_done and step < max_steps: logger.info(f'📍 Step {step}') result, _ = await self.step(step, result) step += 1 is_done = result.is_done if is_done: logger.info(f'✅ Task completed successfully in {step} steps') break if not is_done: logger.info('❌ Maximum number of steps reached') except Exception as e: logger.info(f'❌ Error in run: {e}') raise e finally: storage_state = await self.browser.get_storage_state() if close_context: # Update to close the browser directly await self.browser.close() span.set_attribute("lmnr.span.output", result.model_dump_json()) return AgentOutput( agent_state=self.get_state() if return_agent_state else None, result=result, storage_state=storage_state if return_storage_state else None, step_count=step, trace_id=trace_id, ) async def run_stream(self, prompt: str, max_steps: int = 100, agent_state: str | None = None, parent_span_context: Optional[LaminarSpanContext] = None, close_context: bool = True, timeout: Optional[int] = None, session_id: str | None = None, return_screenshots: bool = False, return_agent_state: bool = False, return_storage_state: bool = False, start_url: str | None = None, ) -> AsyncGenerator[AgentStreamChunk, None]: """Execute the task with maximum number of steps and stream step chunks as they happen Args: prompt: The prompt to execute the task with max_steps: The maximum number of steps to execute the task with agent_state: The state of the agent to execute the task with parent_span_context: Parent span context in Laminar format to execute the task with close_context: Whether to close the browser context after the task is executed timeout: The timeout for the task session_id: Agent session id return_screenshots: Whether to return screenshots with the step chunks return_agent_state: Whether to return the agent state with the final output chunk return_storage_state: Whether to return the storage state with the final output chunk start_url: Optional, the URL to start the task with """ # Create a span for the streaming execution span = Laminar.start_span( name="agent.run_stream", parent_span_context=parent_span_context, input={ "prompt": prompt, "max_steps": max_steps, "stream": True, }, ) trace_id = str(uuid.UUID(int=span.get_span_context().trace_id)) if session_id is not None: span.set_attribute("lmnr.internal.agent_session_id", session_id) with use_span(span): await self._setup_messages(prompt, agent_state, start_url) step = 0 result = None is_done = False if timeout is not None: start_time = time.time() try: # Execute steps and yield results while not is_done and step < max_steps: logger.info(f'📍 Step {step}') with use_span(span): result, summary = await self.step(step, result) step += 1 is_done = result.is_done screenshot = None if return_screenshots: state = self.browser.get_state() screenshot = state.screenshot if timeout is not None and time.time() - start_time > timeout: yield TimeoutChunk( content=TimeoutChunkContent( action_result=result, summary=summary, step=step, agent_state=self.get_state() if return_agent_state else None, screenshot=screenshot, trace_id=trace_id ) ) return yield StepChunk( content=StepChunkContent( action_result=result, summary=summary, trace_id=trace_id, screenshot=screenshot ) ) if is_done: logger.info(f'✅ Task completed successfully in {step} steps') storage_state = await self.browser.get_storage_state() # Yield the final output as a chunk final_output = AgentOutput( agent_state=self.get_state() if return_agent_state else None, result=result, storage_state=storage_state if return_storage_state else None, step_count=step, trace_id=trace_id, ) span.set_attribute("lmnr.span.output", result.model_dump_json()) yield FinalOutputChunk(content=final_output) break if not is_done: logger.info('❌ Maximum number of steps reached') yield StepChunkError(content=f'Maximum number of steps reached: {max_steps}') except Exception as e: logger.info(f'❌ Error in run: {e}') span.record_exception(e) yield StepChunkError(content=f'Error in run stream: {e}') finally: # Clean up resources if close_context: # Update to close the browser directly await self.browser.close() span.end() logger.info('Stream complete, span closed') def get_state(self) -> AgentState: self.state.messages = self.message_manager.get_messages() return self.state ``` ## /index/agent/demo_images/complex_layout_highlight.png Binary file available at https://raw.githubusercontent.com/lmnr-ai/index/refs/heads/main/index/agent/demo_images/complex_layout_highlight.png ## /index/agent/demo_images/complex_layout_small_elements.png Binary file available at https://raw.githubusercontent.com/lmnr-ai/index/refs/heads/main/index/agent/demo_images/complex_layout_small_elements.png ## /index/agent/demo_images/loading.png Binary file available at https://raw.githubusercontent.com/lmnr-ai/index/refs/heads/main/index/agent/demo_images/loading.png ## /index/agent/demo_images/loading2.png Binary file available at https://raw.githubusercontent.com/lmnr-ai/index/refs/heads/main/index/agent/demo_images/loading2.png ## /index/agent/demo_images/scroll.png Binary file available at https://raw.githubusercontent.com/lmnr-ai/index/refs/heads/main/index/agent/demo_images/scroll.png ## /index/agent/message_manager.py ```py path="/index/agent/message_manager.py" from __future__ import annotations import logging from datetime import datetime from typing import List, Optional from index.agent.models import ActionResult, AgentLLMOutput from index.agent.prompts import system_message from index.agent.utils import load_demo_image_as_b64 from index.browser.models import BrowserState from index.browser.utils import scale_b64_image from index.llm.llm import ImageContent, Message, TextContent logger = logging.getLogger(__name__) class MessageManager: def __init__( self, action_descriptions: str, ): self._messages: List[Message] = [] self.action_descriptions = action_descriptions def add_system_message_and_user_prompt(self, prompt: str) -> None: complex_layout_highlight = load_demo_image_as_b64('complex_layout_highlight.png') complex_layout_small_elements = load_demo_image_as_b64('complex_layout_small_elements.png') still_loading = load_demo_image_as_b64('loading.png') still_loading_2 = load_demo_image_as_b64('loading2.png') scroll_over_element_example = load_demo_image_as_b64('scroll.png') system_msg = Message( role="system", content=[ TextContent(text=system_message(self.action_descriptions), cache_control=True), ], ) self._messages.append(system_msg) self._messages.append(Message( role="user", content=[ TextContent(text=''), TextContent(text="Here's an example of a complex layout. As an example, if you want to select a 'Roster' section for Colorado Rockies. Then you need to click on element with index 121."), ImageContent(image_b64=complex_layout_highlight), TextContent(text=''), TextContent(text=''), TextContent(text="Here's an example of small elements on the page and their functions. Element 7, represented by 'x' icon, is a 'clear text' button. Element 8 is a 'submit' button, represented by '=' icon. This clarification should help you better understand similar layouts."), ImageContent(image_b64=complex_layout_small_elements), TextContent(text=''), TextContent(text=''), TextContent(text="Here are some examples of loading pages. If the main content on the page is empty or if there are loading elements, such as skeleton screens, page is still loading. Then, you HAVE to perform `wait_for_page_to_load` action."), ImageContent(image_b64=still_loading), ImageContent(image_b64=still_loading_2), TextContent(text=''), TextContent(text=''), TextContent(text="In some cases, to reveal more content, you need to scroll in scrollable areas of the webpage. Scrollable areas have VERTICAL scrollbars very clearly visible on their right side. In the screenshot below, you can clearly see a scrollbar on the right side of the list of search items. This indicates that the list is scrollable. To scroll over this area, you need to identify any element within the scrollable area and use its index with `scroll_down_over_element` action to scroll over it. In this example, approriate element is with index 15."), ImageContent(image_b64=scroll_over_element_example), TextContent(text='', cache_control=True), TextContent(text=f"""Here is the task you need to complete:\n\n\n{prompt}\n Today's date and time is: {datetime.now().strftime('%B %d, %Y, %I:%M%p')} - keep this date and time in mind when planning your actions."""), ] )) def get_messages_as_state(self) -> List[Message]: """Get messages as state messages""" return [msg for msg in self._messages if msg.is_state_message] def remove_last_message(self) -> None: """Remove last message from history""" if len(self._messages) > 1: self._messages.pop() def add_current_state_message( self, state: BrowserState, previous_result: ActionResult | None = None, user_follow_up_message: str | None = None, ) -> None: """Add browser state as a user message""" if state.interactive_elements: highlighted_elements = '' for element in state.interactive_elements.values(): # exclude sheets elements if element.browser_agent_id.startswith("row_") or element.browser_agent_id.startswith("column_"): continue start_tag = f"[{element.index}]<{element.tag_name}" if element.input_type: start_tag += f" type=\"{element.input_type}\"" start_tag += ">" element_text = element.text.replace('\n', ' ') highlighted_elements += f"{start_tag}{element_text}\n" else: highlighted_elements = '' scroll_distance_above_viewport = state.viewport.scroll_distance_above_viewport or 0 scroll_distance_below_viewport = state.viewport.scroll_distance_below_viewport or 0 if scroll_distance_above_viewport > 0: elements_text = f'{scroll_distance_above_viewport}px scroll distance above current viewport\n' else: elements_text = '[Start of page]\n' if highlighted_elements != '': elements_text += f'\nHighlighted elements:\n{highlighted_elements}' if scroll_distance_below_viewport > 0: elements_text += f'\n{scroll_distance_below_viewport}px scroll distance below current viewport\n' else: elements_text += '\n[End of page]' previous_action_output = '' if previous_result: previous_action_output = f'\n{previous_result.content}\n\n\n' if previous_result.content else '' if previous_result.error: previous_action_output += f'\n{previous_result.error}\n\n\n' if user_follow_up_message: user_follow_up_message = f'\n{user_follow_up_message}\n\n\n' else: user_follow_up_message = '' state_description = f"""{previous_action_output}{user_follow_up_message} Current URL: {state.url} Open tabs: {state.tabs} Current viewport information: {elements_text} """ state_msg = Message( role='user', content=[ TextContent(text=state_description), TextContent(text=''), ImageContent(image_b64=state.screenshot), TextContent(text=''), TextContent(text=''), ImageContent(image_b64=state.screenshot_with_highlights), TextContent(text=''), ] ) self._messages.append(state_msg) def add_message_from_model_output(self, step: int, previous_result: ActionResult | None, model_output: AgentLLMOutput, screenshot: Optional[str] = None) -> None: """Add model output as AI message""" previous_action_output = '' for msg in self._messages: if msg.is_state_message: msg.content = [msg.content[0]] if previous_result and screenshot: previous_action_output = f'\n{previous_result.content}\n' if previous_result.content else '' if previous_result.error: previous_action_output += f'\n{previous_result.error}\n' usr_msg = Message( role='user', content=[ TextContent(text=previous_action_output, cache_control=True), TextContent(text=f""), ImageContent(image_b64=scale_b64_image(screenshot, 0.75)), TextContent(text=f""), ], is_state_message=True, ) self._messages.append(usr_msg) assistant_content = [ TextContent(text=f""" {model_output.model_dump_json(indent=2, include={"thought", "action", "summary"}).strip()} """), ] if model_output.thinking_block: assistant_content = [ model_output.thinking_block, ] + assistant_content msg = Message( role='assistant', content=assistant_content, ) self._messages.append(msg) def get_messages(self) -> List[Message]: found_first_cache_control = False # clear all past cache control except the latest one for msg in self._messages[::-1]: # ignore system messages if msg.role == 'system': continue if found_first_cache_control: msg.remove_cache_control() if msg.has_cache_control(): found_first_cache_control = True return self._messages def set_messages(self, messages: List[Message]) -> None: """Set messages""" self._messages = messages ``` ## /index/agent/models.py ```py path="/index/agent/models.py" from __future__ import annotations from typing import Any, Dict, Literal, Optional from playwright.async_api import StorageState from pydantic import BaseModel from index.llm.llm import Message, ThinkingBlock class AgentState(BaseModel): """State of the agent""" messages: list[Message] class ActionResult(BaseModel): """Result of executing an action""" is_done: Optional[bool] = False content: Optional[str] = None error: Optional[str] = None give_control: Optional[bool] = False class ActionModel(BaseModel): """Model for an action""" name: str params: Dict[str, Any] class AgentLLMOutput(BaseModel): """Output model for agent""" action: ActionModel thought: Optional[str] = None summary: Optional[str] = None thinking_block: Optional[ThinkingBlock] = None class AgentOutput(BaseModel): """Output model for agent""" agent_state: Optional[AgentState] = None result: ActionResult step_count: int = 0 storage_state: Optional[StorageState] = None trace_id: str | None = None class AgentStreamChunk(BaseModel): """Base class for chunks in the agent stream""" type: str class StepChunkContent(BaseModel): action_result: ActionResult summary: str trace_id: str | None = None screenshot: Optional[str] = None class StepChunk(AgentStreamChunk): """Chunk containing a step result""" type: Literal["step"] = "step" content: StepChunkContent class TimeoutChunkContent(BaseModel): action_result: ActionResult summary: str step: int agent_state: AgentState | None = None trace_id: str | None = None screenshot: Optional[str] = None class TimeoutChunk(AgentStreamChunk): """Chunk containing a timeout""" type: Literal["step_timeout"] = "step_timeout" content: TimeoutChunkContent class StepChunkError(AgentStreamChunk): """Chunk containing an error""" type: Literal["step_error"] = "step_error" content: str class FinalOutputChunk(AgentStreamChunk): """Chunk containing the final output""" type: Literal["final_output"] = "final_output" content: AgentOutput ``` ## /index/agent/prompts.py ```py path="/index/agent/prompts.py" def system_message(action_descriptions: str) -> str: return f"""You are an advanced AI assistant designed to interact with a web browser and complete user tasks. Your capabilities include analyzing web page screenshots, interacting with page elements, and navigating through websites to accomplish various objectives. First, let's review the available actions you can perform: {action_descriptions} Your goal is to complete the user's task by carefully analyzing the current state of the web page, planning your actions, reflecting on the outcomes of the previous actions, and avoiding repetition of unsuccessful approaches. Follow the guidelines below: 1. Element Identification: - Interactable elements on the page are enclosed in uniquely colored bounding boxes with numbered labels. - Label corresponding to its bounding box is placed at the top right corner of the bounding box, and has exact same color as the bounding box. If the label is larger than the bounding box, the label is placed right outside and tangent to the bounding box. - Carefully match labels to their corresponding bounding boxes based on the color and position of the label, as labels might slightly overlap with unrelated bounding boxes. - If bounding box doesn't enclose any element, simply ignore it (most likely the bounding box was incorrectly detected). - Screenshot enclosed in tag contains clean screenshot of a current browser window. - Screenshot enclosed in tag has bounding boxes with labels drawn around interactable elements. - Carefully analyze both screenshots to understand the layout of the page and accurately map bounding boxes to their corresponding elements. - Remember: each bounding box and corresponding label have the same unique color. 2. Element Interaction: - Infer role and function of elements based on their appearance, text/icon inside the element, and location on the page. - Interact only with visible elements on the screen. - Before entering a text into an input area, make sure that you have clicked on the target input area first. - Scroll or interact with elements to reveal more content if necessary information is not visible. - To scroll within areas with scrollbars, first identify any element inside the scrollable area and use its index with `scroll_down_over_element` or `scroll_up_over_element` actions instead of scrolling the entire page. Pay attention to the scrollbar position and direction to identify the correct element. - Some pages have navigation menu on the left, which might contain useful information, such as filters, categories, navigation, etc. Pay close attention to whether the side menu has scrollbars. If it does, scroll over it using an element within the side menu. - For clicking on a cell in a spreadsheet, first identify the correct column and row that corresponds to the cell you want to click on. Then, strictly use the `click_on_spreadsheet_cell` action to click on the cell. Don't use `click_element` action for interacting with a spreadsheet cells. 3. Task Execution: - After you perform an action, analyze the state screenshot to verify that the intended result was achieved (filter was applied, correct date range was selected, text was entered, etc.). If the result was not achieved, identify the problem and fix it. Be creative and persistent in your approach and don't repeat the same actions that failed. - Break down multi-step tasks into sub-tasks and complete each sub-task one by one. - Thoroughly explore all possible approaches before declaring the task complete. - If you encounter obstacles, consider alternative approaches such as returning to a previous page, initiating a new search, or opening a new tab. - Understand elements on the page and infer the most relevant ones for the current step of the task. - Ensure that your final output fully addresses all aspects of the user's request. - Include ALL requested information in the "done" action. Include markdown-formatted links where relevant and useful. - Important: For research tasks, be persistent and explore multiple results (at least 5-10) before giving up. - Be persistent and creative in your approach, e.g., using site-specific Google searches to find precise information. 4. Special Situations: - Cookie popups: Click "I accept" if present. If it persists after clicking, ignore it. - CAPTCHA: Attempt to solve logically. If unsuccessful, open a new tab and continue the task. 5. Returning control to human: - For steps that require user information to proceed, such as providing first name, last name, email, phone number, booking information, login, password, credit card information, credentials, etc., unless this information was provided in the initial prompt, you must use `give_human_control` action to give human control of the browser. - If you can't solve the CAPTCHA, use the `give_human_control` action to give human control of the browser to aid you in solving the CAPTCHA. - Control is guaranteed to be returned to you after the human has entered the information or solved the CAPTCHA, so you should plan your next actions accordingly. 6. Source citations: - When you perform research tasks, include links to the websites that you found the information in your final output. - In general, include links to the websites that you found the information in your final output. - Strictly use markdown format for the links, because the final output will be rendered as markdown. 7. Spreadsheet interaction: - To click on a cell in a spreadsheet, use the `click_on_spreadsheet_cell` action to click on a specific cell. DON'T use `click_element` action for interacting with a spreadsheet cells or other elements when the goal is to click on a specific cell. - To input text into a spreadsheet cell, first click on the cell using the `click_on_spreadsheet_cell` action, then use the `enter_text` action to input text. Your response must always be in the following JSON format, enclosed in tags: {{ "thought": "EITHER a very short summary of your thinking process with key points OR exact information that you need to remember for the future (in case of research tasks).", "action": {{ "name": "action_name", "params": {{ "param1": "value1", "param2": "value2" }} }}, "summary": "Extremely brief summary of what you are doing to display to the user to help them understand what you are doing" }} Remember: - Think concisely. - Output only a single action per response. - You will be prompted again after each action. - Always provide an output in the specified JSON format, enclosed in tags. - Reflect on the outcomes of the past actions to avoid repeating unsuccessful approaches. - Be creative and persistent in trying different strategies within the boundaries of the website. - Break down multi-step tasks into sub-tasks and complete each sub-task one by one. - For research tasks, be thorough and explore multiple results before concluding that the desired information is unavailable. Continue this process until you are absolutely certain that you have completed the user's task fully and accurately. Be thorough, creative, and persistent in your approach. Your final output should consist only of the correctly formatted JSON object enclosed in tags and should not duplicate or rehash any of the work you did in the thinking block.""" ``` ## /index/agent/utils.py ```py path="/index/agent/utils.py" import base64 import importlib.resources import logging from index.browser.utils import scale_b64_image logger = logging.getLogger(__name__) def load_demo_image_as_b64(image_name: str) -> str: """ Load an image from the demo_images directory and return it as a base64 string. Works reliably whether the package is used directly or as a library. Args: image_name: Name of the image file (including extension) Returns: Base64 encoded string of the image """ try: # Using importlib.resources to reliably find package data with importlib.resources.path('index.agent.demo_images', image_name) as img_path: with open(img_path, 'rb') as img_file: b64 = base64.b64encode(img_file.read()).decode('utf-8') return scale_b64_image(b64, 0.75) except Exception as e: logger.error(f"Error loading demo image {image_name}: {e}") raise ``` ## /index/browser/browser.py ```py path="/index/browser/browser.py" """ Streamlined Playwright browser implementation. """ import asyncio import base64 import io import logging from dataclasses import dataclass, field from importlib import resources from typing import Any, Optional from lmnr import observe from PIL import Image from playwright.async_api import ( Browser as PlaywrightBrowser, ) from playwright.async_api import ( BrowserContext as PlaywrightBrowserContext, ) from playwright.async_api import ( Page, Playwright, StorageState, async_playwright, ) from tenacity import ( retry, retry_if_exception_type, stop_after_attempt, wait_exponential, ) from typing_extensions import TypedDict # to account for older python versions # Import detector class from index.browser.detector import Detector from index.browser.models import ( BrowserError, BrowserState, InteractiveElementsData, TabInfo, ) from index.browser.utils import ( filter_elements, put_highlight_elements_on_screenshot, scale_b64_image, ) logger = logging.getLogger(__name__) INTERACTIVE_ELEMENTS_JS_CODE = resources.read_text('index.browser', 'findVisibleInteractiveElements.js') class ViewportSize(TypedDict): width: int height: int @dataclass class BrowserConfig: """ Simplified configuration for the Browser. Parameters: cdp_url: Optional[str] = None Connect to a browser instance via CDP viewport_size: ViewportSize = {"width": 1024, "height": 768} Default browser window size storage_state: Optional[StorageState] = None Storage state to set detector: Optional[Detector] = None Detector instance for CV element detection. If None, CV detection is disabled. """ cdp_url: Optional[str] = None viewport_size: ViewportSize = field(default_factory=lambda: {"width": 1024, "height": 768}) storage_state: Optional[StorageState] = None detector: Optional[Detector] = None class Browser: """ Unified Browser responsible for interacting with the browser via Playwright. """ def __init__(self, config: BrowserConfig = BrowserConfig(), close_context: bool = True): logger.debug('Initializing browser') self.config = config self.close_context = close_context # Playwright-related attributes self.playwright: Optional[Playwright] = None self.playwright_browser: Optional[PlaywrightBrowser] = None self.context: Optional[PlaywrightBrowserContext] = None # Page and state management self.current_page: Optional[Page] = None self._state: Optional[BrowserState] = None self._cdp_session = None # CV detection-related attributes self.detector: Optional[Detector] = config.detector self.screenshot_scale_factor = None # Initialize state self._init_state() async def __aenter__(self): """Async context manager entry""" await self._init_browser() return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Async context manager exit""" if self.close_context: await self.close() def _init_state(self, url: str = '') -> None: """Initialize browser state""" self._state = BrowserState( url=url, screenshot_with_highlights=None, tabs=[], interactive_elements={}, ) async def _init_browser(self): """Initialize the browser and context""" logger.debug('Initializing browser context') # Start playwright if needed if self.playwright is None: self.playwright = await async_playwright().start() # Initialize browser if needed if self.playwright_browser is None: if self.config.cdp_url: logger.info(f'Connecting to remote browser via CDP {self.config.cdp_url}') attempts = 0 while True: try: self.playwright_browser = await self.playwright.chromium.connect_over_cdp( self.config.cdp_url, timeout=2500, ) break except Exception as e: logger.error(f'Failed to connect to remote browser via CDP {self.config.cdp_url}: {e}. Retrying...') await asyncio.sleep(1) attempts += 1 if attempts > 3: raise e logger.info(f'Connected to remote browser via CDP {self.config.cdp_url}') else: logger.info('Launching new browser instance') self.playwright_browser = await self.playwright.chromium.launch( headless=False, args=[ '--no-sandbox', '--disable-blink-features=AutomationControlled', '--disable-web-security', '--disable-site-isolation-trials', '--disable-features=IsolateOrigins,site-per-process', f'--window-size={self.config.viewport_size["width"]},{self.config.viewport_size["height"]}', ] ) # Create context if needed if self.context is None: if len(self.playwright_browser.contexts) > 0: self.context = self.playwright_browser.contexts[0] else: self.context = await self.playwright_browser.new_context( viewport=self.config.viewport_size, user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.102 Safari/537.36', java_script_enabled=True, bypass_csp=True, ignore_https_errors=True ) # Apply anti-detection scripts await self._apply_anti_detection_scripts() self.context.on('page', self._on_page_change) if self.config.storage_state and 'cookies' in self.config.storage_state: await self.context.add_cookies(self.config.storage_state['cookies']) # Create page if needed if self.current_page is None: if len(self.context.pages) > 0: self.current_page = self.context.pages[-1] else: self.current_page = await self.context.new_page() return self async def _on_page_change(self, page: Page): """Handle page change events""" logger.info(f'Current page changed to {page.url}') self._cdp_session = await self.context.new_cdp_session(page) self.current_page = page async def _apply_anti_detection_scripts(self): """Apply scripts to avoid detection as automation""" await self.context.add_init_script( """ // Webdriver property Object.defineProperty(navigator, 'webdriver', { get: () => undefined }); // Languages Object.defineProperty(navigator, 'languages', { get: () => ['en-US'] }); // Plugins Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5] }); // Chrome runtime window.chrome = { runtime: {} }; // Permissions const originalQuery = window.navigator.permissions.query; window.navigator.permissions.query = (parameters) => ( parameters.name === 'notifications' ? Promise.resolve({ state: Notification.permission }) : originalQuery(parameters) ); (function () { const originalAttachShadow = Element.prototype.attachShadow; Element.prototype.attachShadow = function attachShadow(options) { return originalAttachShadow.call(this, { ...options, mode: "open" }); }; })(); """ ) async def close(self): """Close the browser instance and cleanup resources""" logger.debug('Closing browser') try: # Close CDP session if exists self._cdp_session = None # Close context if self.context: try: await self.context.close() except Exception as e: logger.debug(f'Failed to close context: {e}') self.context = None # Close browser if self.playwright_browser: try: await self.playwright_browser.close() except Exception as e: logger.debug(f'Failed to close browser: {e}') self.playwright_browser = None # Stop playwright if self.playwright: await self.playwright.stop() self.playwright = None except Exception as e: logger.error(f'Error during browser cleanup: {e}') finally: self.context = None self.current_page = None self._state = None self.playwright_browser = None self.playwright = None async def goto(self, url: str): """Navigate to a URL""" page = await self.get_current_page() await page.goto(url, wait_until='domcontentloaded') await asyncio.sleep(2) async def get_tabs_info(self) -> list[TabInfo]: """Get information about all tabs""" tabs_info = [] for page_id, page in enumerate(self.context.pages): tab_info = TabInfo(page_id=page_id, url=page.url, title=await page.title()) tabs_info.append(tab_info) return tabs_info async def switch_to_tab(self, page_id: int) -> None: """Switch to a specific tab by its page_id""" if self.context is None: await self._init_browser() pages = self.context.pages if page_id >= len(pages): raise BrowserError(f'No tab found with page_id: {page_id}') page = pages[page_id] self.current_page = page await page.bring_to_front() await page.wait_for_load_state() async def create_new_tab(self, url: str | None = None) -> None: """Create a new tab and optionally navigate to a URL""" if self.context is None: await self._init_browser() new_page = await self.context.new_page() self.current_page = new_page await new_page.wait_for_load_state() if url: await new_page.goto(url, wait_until='domcontentloaded') async def close_current_tab(self): """Close the current tab""" if self.current_page is None: return await self.current_page.close() # Switch to the first available tab if any exist if self.context and self.context.pages: await self.switch_to_tab(0) async def get_current_page(self) -> Page: """Get the current page""" if self.current_page is None: await self._init_browser() return self.current_page def get_state(self) -> BrowserState: """Get the current browser state""" return self._state @observe(name='browser.update_state', ignore_output=True) async def update_state(self) -> BrowserState: """Update the browser state with current page information and return it""" self._state = await self._update_state() return self._state @observe(name='browser._update_state', ignore_output=True) async def _update_state(self) -> BrowserState: """Update and return state.""" @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=0.5, min=0.5, max=2), retry=retry_if_exception_type((Exception)), reraise=True ) async def get_stable_state(): if self.current_page is None: await self._init_browser() url = self.current_page.url detect_sheets = 'docs.google.com/spreadsheets/d' in url screenshot_b64 = await self.fast_screenshot() interactive_elements_data = await self.get_interactive_elements(screenshot_b64, detect_sheets) interactive_elements = {element.index: element for element in interactive_elements_data.elements} # Create highlighted version of the screenshot screenshot_with_highlights = put_highlight_elements_on_screenshot( interactive_elements, screenshot_b64 ) tabs = await self.get_tabs_info() return BrowserState( url=url, tabs=tabs, screenshot_with_highlights=screenshot_with_highlights, screenshot=screenshot_b64, viewport=interactive_elements_data.viewport, interactive_elements=interactive_elements, ) try: self._state = await get_stable_state() return self._state except Exception as e: logger.error(f'Failed to update state after multiple attempts: {str(e)}') # Return last known good state if available if hasattr(self, '_state'): return self._state raise @observe(name='browser.detect_browser_elements') async def detect_browser_elements(self) -> InteractiveElementsData: """Get all interactive elements on the page""" page = await self.get_current_page() result = await page.evaluate(INTERACTIVE_ELEMENTS_JS_CODE) interactive_elements_data = InteractiveElementsData(**result) return interactive_elements_data @observe(name='browser.get_interactive_elements', ignore_output=True) async def get_interactive_elements(self, screenshot_b64: str, detect_sheets: bool = False) -> InteractiveElementsData: """ Get interactive elements using combined browser and CV detection. Args: screenshot_b64: Optional base64 encoded screenshot. If None, a new screenshot will be taken. detect_sheets: Whether to detect sheets elements Returns: Combined detection results """ elements = [] if self.detector is not None: browser_elements_data = await self.detect_browser_elements() scale_factor = browser_elements_data.viewport.width / 1024 cv_elements = await self.detector.detect_from_image(screenshot_b64, scale_factor, detect_sheets) # Combine and filter detections elements = filter_elements(browser_elements_data.elements + cv_elements) else: browser_elements_data = await self.detect_browser_elements() elements = browser_elements_data.elements # Create new InteractiveElementsData with combined elements return InteractiveElementsData( viewport=browser_elements_data.viewport, elements=elements ) async def get_cdp_session(self): """Get or create a CDP session for the current page""" # Create a new session if we don't have one or the page has changed if (self._cdp_session is None or not hasattr(self._cdp_session, '_page') or self._cdp_session._page != self.current_page): self._cdp_session = await self.context.new_cdp_session(self.current_page) # Store reference to the page this session belongs to self._cdp_session._page = self.current_page return self._cdp_session @observe(name='browser.take_screenshot', ignore_output=True) async def fast_screenshot(self) -> str: """ Returns a base64 encoded screenshot of the current page. Returns: Base64 encoded screenshot """ # Use cached CDP session instead of creating a new one each time cdp_session = await self.get_cdp_session() screenshot_params = { "format": "png", "fromSurface": False, "captureBeyondViewport": False, } # Capture screenshot using CDP Session screenshot_data = await cdp_session.send("Page.captureScreenshot", screenshot_params) screenshot_b64 = screenshot_data["data"] if self.screenshot_scale_factor is None: test_img_data = base64.b64decode(screenshot_b64) test_img = Image.open(io.BytesIO(test_img_data)) logger.info(f'Test image size: {test_img.size}') self.screenshot_scale_factor = 1024 / test_img.size[0] logger.info(f'Screenshot scale factor: {self.screenshot_scale_factor}') screenshot_b64 = scale_b64_image(screenshot_b64, self.screenshot_scale_factor) return screenshot_b64 async def get_cookies(self) -> list[dict[str, Any]]: """Get cookies from the browser""" if self.context: cookies = await self.context.cookies() return cookies return [] async def get_storage_state(self) -> dict[str, Any]: """Get local storage from the browser""" if self.context: cookies = await self.context.cookies() return { 'cookies': cookies, } return {} ``` ## /index/browser/detector.py ```py path="/index/browser/detector.py" """ Computer vision detector module. """ from abc import ABC, abstractmethod from typing import List from index.browser.models import InteractiveElement class Detector(ABC): """Abstract interface for object detection in browser screenshots.""" @abstractmethod async def detect_from_image(self, image_b64: str, scale_factor: float, detect_sheets: bool = False) -> List[InteractiveElement]: """ Detect interactive elements from a base64 encoded image. Args: image_b64: Base64 encoded image screenshot. scale_factor: Scale factor to scale the coordinates of screenshot to browser viewport coordinates. detect_sheets: Flag to indicate if specialized sheet detection should be used. Returns: List of detected InteractiveElement objects. """ pass ``` ## /index/browser/findVisibleInteractiveElements.js ```js path="/index/browser/findVisibleInteractiveElements.js" () => { console.time('totalExecutionTime'); // Define element weights for interactive likelihood - moved to higher scope const elementWeights = { 'button': 10, 'a': 10, 'input': 10, 'select': 10, 'textarea': 10, 'summary': 8, 'details': 7, 'label': 5, // Labels are clickable but not always interactive 'option': 7, 'tr': 4, 'th': 3, 'td': 3, 'li': 8, 'div': 2, 'span': 1, 'img': 2, 'svg': 3, 'path': 3 }; function generateUniqueId() { const rand = Math.random().toString(36); return `ba-${rand}`; } // Add this helper function to check element coverage function isElementTooBig(rect) { const viewportWidth = window.innerWidth || document.documentElement.clientWidth; const viewportHeight = window.innerHeight || document.documentElement.clientHeight; const viewportArea = viewportWidth * viewportHeight; // Calculate visible area of the element const visibleWidth = Math.min(rect.right, viewportWidth) - Math.max(rect.left, 0); const visibleHeight = Math.min(rect.bottom, viewportHeight) - Math.max(rect.top, 0); const visibleArea = visibleWidth * visibleHeight; // Check if element covers more than 50% of viewport return (visibleArea / viewportArea) > 0.5; } // Helper function to check if element is in the visible viewport function isInViewport(rect) { // Get viewport dimensions const viewportWidth = window.innerWidth || document.documentElement.clientWidth; const viewportHeight = window.innerHeight || document.documentElement.clientHeight; // Element must have meaningful size if (rect.width < 2 || rect.height < 2) { return false; } // Check if substantial part of the element is in viewport (at least 30%) const visibleWidth = Math.min(rect.right, viewportWidth) - Math.max(rect.left, 0); const visibleHeight = Math.min(rect.bottom, viewportHeight) - Math.max(rect.top, 0); if (visibleWidth <= 0 || visibleHeight <= 0) { return false; // Not in viewport at all } const visibleArea = visibleWidth * visibleHeight; const totalArea = rect.width * rect.height; const visiblePercent = visibleArea / totalArea; return visiblePercent >= 0.3; // At least 30% visible } // Helper function to get correct bounding rectangle, accounting for iframes function getAdjustedBoundingClientRect(element, contextInfo = null) { const rect = element.getBoundingClientRect(); // If element is in an iframe, adjust coordinates if (contextInfo && contextInfo.iframe) { const iframeRect = contextInfo.iframe.getBoundingClientRect(); return { top: rect.top + iframeRect.top, right: rect.right + iframeRect.left, bottom: rect.bottom + iframeRect.top, left: rect.left + iframeRect.left, width: rect.width, height: rect.height }; } return rect; } // Helper function to check if element is the top element at its position function isTopElement(element) { try { const rect = getAdjustedBoundingClientRect(element, element._contextInfo); const centerX = rect.left + rect.width / 2; const centerY = rect.top + rect.height / 2; // Check if the element is visible at its center point const elementsAtPoint = document.elementsFromPoint(centerX, centerY); // Nothing at this point (might be covered by an overlay) if (!elementsAtPoint || elementsAtPoint.length === 0) { return false; } // Handle iframe cases if (element._contextInfo && element._contextInfo.iframe) { // For elements in iframes, check if the iframe itself is the top-level element // then check if the element is topmost within that iframe const iframe = element._contextInfo.iframe; // First check if iframe is visible at the adjusted center point const iframeVisibleAtPoint = elementsAtPoint.includes(iframe); if (!iframeVisibleAtPoint) { return false; } // Then check if element is topmost within the iframe try { const iframeDoc = iframe.contentDocument || iframe.contentWindow.document; // Convert coordinates to iframe's local coordinate system const iframeRect = iframe.getBoundingClientRect(); const localX = centerX - iframeRect.left; const localY = centerY - iframeRect.top; const elementAtPointInIframe = iframeDoc.elementFromPoint(localX, localY); if (!elementAtPointInIframe) return false; return elementAtPointInIframe === element || element.contains(elementAtPointInIframe) || elementAtPointInIframe.contains(element); } catch (e) { console.warn('Error checking element position in iframe:', e); return false; } } // Handle shadow DOM cases if (element._contextInfo && element._contextInfo.shadowHost) { // For shadow DOM elements, first check if its shadow host is visible const shadowHost = element._contextInfo.shadowHost; const shadowHostVisible = elementsAtPoint.includes(shadowHost); if (!shadowHostVisible) { return false; } // Shadow DOM elements aren't directly accessible via elementsFromPoint // So we're simplifying and assuming visibility based on the host visibility return true; } const elementAtPoint = document.elementFromPoint(centerX, centerY); if (!elementAtPoint) return false; // Check if the element at this point is our element or a descendant/ancestor of our element return element === elementAtPoint || element.contains(elementAtPoint) || elementAtPoint.contains(element); } catch (e) { console.warn('Error in isTopElement check:', e); return false; } } // Add helper function to get effective z-index function getEffectiveZIndex(element) { let current = element; let zIndex = 'auto'; while (current && current !== document) { const style = window.getComputedStyle(current); if (style.position !== 'static' && style.zIndex !== 'auto') { zIndex = parseInt(style.zIndex, 10); break; } current = current.parentElement; } return zIndex === 'auto' ? 0 : zIndex; } // Function to find all interactive elements function findInteractiveElements() { console.time('findInteractiveElements'); // Batch selectors for better performance const selectors = { highPriority: 'button, a[href], input:not([type="hidden"]), select, textarea, [role="button"], [role="link"], [role="checkbox"], [role="menuitem"], [role="tab"], li[role="option"], [role="switch"]', mediumPriority: 'details, summary, svg, path, td, [role="option"], [role="radio"], [role="switch"], [tabindex]:not([tabindex="-1"]), [aria-label], [aria-labelledby]', lowPriority: '[onclick], .clickable, .btn, .button, .nav-item, .menu-item' }; // Process only elements in viewport for better performance const allElements = []; const processedElements = new Set(); const viewportElements = []; // Function to query elements within a document or shadow root function queryElementsInContext(context, selector) { try { return context.querySelectorAll(selector); } catch (e) { console.warn('Error querying for elements:', e); return []; } } // Function to process a document or shadow root function processContext(context, contextInfo = { iframe: null, shadowHost: null }) { // Process elements in priority order Object.keys(selectors).forEach(priority => { try { const elements = queryElementsInContext(context, selectors[priority]); for (let i = 0; i < elements.length; i++) { const element = elements[i]; // Skip already processed if (processedElements.has(element)) { continue; } processedElements.add(element); // Add context information to the element element._contextInfo = contextInfo; allElements.push(element); } } catch (e) { console.warn(`Error processing ${priority} elements:`, e); } }); // Process shadow DOM const shadowHosts = queryElementsInContext(context, '*'); for (let i = 0; i < shadowHosts.length; i++) { const host = shadowHosts[i]; if (host.shadowRoot) { processContext( host.shadowRoot, { iframe: contextInfo.iframe, shadowHost: host } ); } } } // Process main document processContext(document); // Process iframes try { const iframes = document.querySelectorAll('iframe'); for (let i = 0; i < iframes.length; i++) { const iframe = iframes[i]; // Skip iframes from different origins try { // This will throw if cross-origin const iframeDoc = iframe.contentDocument || iframe.contentWindow.document; processContext(iframeDoc, { iframe: iframe, shadowHost: null }); } catch (e) { console.warn('Could not access iframe content (likely cross-origin):', e); } } } catch (e) { console.warn('Error processing iframes:', e); } // Process cursor:pointer elements in all contexts function processCursorPointerElements(context, contextInfo = { iframe: null, shadowHost: null }) { try { const allElementsInContext = queryElementsInContext(context, '*'); for (let i = 0; i < allElementsInContext.length; i++) { const element = allElementsInContext[i]; // Skip already processed if (processedElements.has(element)) { continue; } // Quick check before expensive operations const rect = getAdjustedBoundingClientRect(element, contextInfo); if (!isInViewport(rect)) { continue; } // Check style if (isTopElement(element) && window.getComputedStyle(element).cursor === 'pointer') { // Add context information to the element element._contextInfo = contextInfo; processedElements.add(element); allElements.push(element); viewportElements.push({ element: element, rect: rect, weight: 1, zIndex: getEffectiveZIndex(element) }); } // Process shadow DOM of this element if (element.shadowRoot) { processCursorPointerElements( element.shadowRoot, { iframe: contextInfo.iframe, shadowHost: element } ); } } } catch (e) { console.warn('Error processing cursor:pointer elements:', e); } } // Process cursor:pointer elements in the main document processCursorPointerElements(document); // Process cursor:pointer elements in iframes try { const iframes = document.querySelectorAll('iframe'); for (let i = 0; i < iframes.length; i++) { const iframe = iframes[i]; try { const iframeDoc = iframe.contentDocument || iframe.contentWindow.document; processCursorPointerElements(iframeDoc, { iframe: iframe, shadowHost: null }); } catch (e) { // Already logged in previous iframe processing } } } catch (e) { // Already logged in previous iframe processing } // Filter for visible elements for (let i = 0; i < allElements.length; i++) { const element = allElements[i]; // Skip detailed processing if not in viewport const rect = getAdjustedBoundingClientRect(element, element._contextInfo); if (!isInViewport(rect)) { continue; } // Skip disabled elements if (element.hasAttribute('disabled') || element.getAttribute('aria-disabled') === 'true') { continue; } // Add check for too-large elements if (isElementTooBig(rect)) { continue; // Skip elements that cover more than 50% of viewport } // Check if the element is the top element at its position if (!isTopElement(element)) { continue; } // Calculate element weight let weight = elementWeights[element.tagName.toLowerCase()] || 1; // Boost weight for elements with specific attributes if (element.getAttribute('role') === 'button') weight = Math.max(weight, 8); if (element.hasAttribute('onclick')) weight = Math.max(weight, 7); if (element.hasAttribute('href')) weight = Math.max(weight, 8); if (window.getComputedStyle(element).cursor === 'pointer') weight = Math.max(weight, 4); // Add to viewport elements viewportElements.push({ element: element, rect: rect, weight: weight, zIndex: getEffectiveZIndex(element) }); // Add this to the code that processes each element element.setAttribute('data-element-index', i); // Add a unique identifier attribute to the element const uniqueId = generateUniqueId(); element.setAttribute('data-browser-agent-id', uniqueId); } console.timeEnd('findInteractiveElements'); console.log(`Found ${viewportElements.length} interactive elements in viewport (out of ${allElements.length} total)`); return viewportElements; } // Calculate Intersection over Union (IoU) between two rectangles function calculateIoU(rect1, rect2) { // Calculate area of each rectangle const area1 = (rect1.right - rect1.left) * (rect1.bottom - rect1.top); const area2 = (rect2.right - rect2.left) * (rect2.bottom - rect2.top); // Calculate intersection const intersectLeft = Math.max(rect1.left, rect2.left); const intersectTop = Math.max(rect1.top, rect2.top); const intersectRight = Math.min(rect1.right, rect2.right); const intersectBottom = Math.min(rect1.bottom, rect2.bottom); // Check if intersection exists if (intersectRight < intersectLeft || intersectBottom < intersectTop) { return 0; // No intersection } // Calculate area of intersection const intersectionArea = (intersectRight - intersectLeft) * (intersectBottom - intersectTop); // Calculate union area const unionArea = area1 + area2 - intersectionArea; // Calculate IoU return intersectionArea / unionArea; } // Check if rect1 is fully contained within rect2 function isFullyContained(rect1, rect2) { return rect1.left >= rect2.left && rect1.right <= rect2.right && rect1.top >= rect2.top && rect1.bottom <= rect2.bottom; } // Filter overlapping elements using weight and IoU function filterOverlappingElements(elements) { console.time('filterOverlappingElements'); // Sort by area (descending - larger first), then by weight (descending) for same area elements.sort((a, b) => { // Calculate areas const areaA = a.rect.width * a.rect.height; const areaB = b.rect.width * b.rect.height; // Sort by area first (larger area first) if (areaB !== areaA) { return areaB - areaA; // Larger area first } // For same area, sort by weight (higher weight first) return b.weight - a.weight; }); const filteredElements = []; const iouThreshold = 0.7; // Threshold for considering elements as overlapping // Add elements one by one, checking against already added elements for (let i = 0; i < elements.length; i++) { const current = elements[i]; let shouldAdd = true; // For each element already in our filtered list for (let j = 0; j < filteredElements.length; j++) { const existing = filteredElements[j]; // Convert DOMRect to plain object for IoU calculation const currentRect = { left: current.rect.left, top: current.rect.top, right: current.rect.right, bottom: current.rect.bottom }; const existingRect = { left: existing.rect.left, top: existing.rect.top, right: existing.rect.right, bottom: existing.rect.bottom }; // Check for high overlap const iou = calculateIoU(currentRect, existingRect); if (iou > iouThreshold) { shouldAdd = false; break; } // Check if current element is fully contained within an existing element with higher weight if (existing.weight >= current.weight && isFullyContained(currentRect, existingRect) && existing.zIndex === current.zIndex) { shouldAdd = false; break; } } if (shouldAdd) { filteredElements.push(current); } } console.timeEnd('filterOverlappingElements'); return filteredElements; } // Main function to get interactive elements with coordinates function getInteractiveElementsData() { // Find all potential interactive elements const potentialElements = findInteractiveElements(); // Filter out overlapping elements const filteredElements = filterOverlappingElements(potentialElements); console.log(`Filtered to ${filteredElements.length} non-overlapping elements`); // Sort elements by position (top-to-bottom, left-to-right) const sortedElements = sortElementsByPosition(filteredElements); // Prepare result with viewport metadata const result = { viewport: { width: window.innerWidth, height: window.innerHeight, scrollX: Math.round(window.scrollX), scrollY: Math.round(window.scrollY), devicePixelRatio: window.devicePixelRatio || 1, scrollDistanceAboveViewport: Math.round(window.scrollY), scrollDistanceBelowViewport: Math.round(document.documentElement.scrollHeight - window.scrollY - window.innerHeight) }, elements: [] }; // Process each interactive element (now sorted by position) sortedElements.forEach((item, index) => { const element = item.element; const rect = item.rect; // Ensure each element has a index_id let browserId = element.getAttribute('data-browser-agent-id'); if (!browserId) { const uniqueId = generateUniqueId(); element.setAttribute('data-browser-agent-id', uniqueId); browserId = uniqueId; } // Get element text (direct or from children) let text = element.innerText || ''; if (!text) { const textNodes = Array.from(element.childNodes) .filter(node => node.nodeType === Node.TEXT_NODE) .map(node => node.textContent.trim()) .filter(content => content.length > 0); text = textNodes.join(' '); } // Extract important attributes const attributes = {}; ['id', 'class', 'href', 'type', 'name', 'value', 'placeholder', 'aria-label', 'title', 'role'].forEach(attr => { if (element.hasAttribute(attr)) { attributes[attr] = element.getAttribute(attr); } }); // Determine input type and element role more clearly let elementType = element.tagName.toLowerCase(); let inputType = null; // Handle input elements specifically if (elementType === 'input' && element.hasAttribute('type')) { inputType = element.getAttribute('type').toLowerCase(); } // scaledRect is for coordinates scaled to 1024 width const scaleFactor = 1024 / window.innerWidth const scaledRect = { left: Math.round(rect.left * scaleFactor), top: Math.round(rect.top * scaleFactor), right: Math.round(rect.right * scaleFactor), bottom: Math.round(rect.bottom * scaleFactor), width: Math.round(rect.width * scaleFactor), height: Math.round(rect.height * scaleFactor), } // Create element data object const elementData = { tagName: elementType, text: text.trim(), attributes, index, weight: item.weight, browserAgentId: browserId, // Use the guaranteed ID inputType: inputType, // Add specific input type viewport: { x: Math.round(rect.left), y: Math.round(rect.top), width: Math.round(rect.width), height: Math.round(rect.height) }, page: { x: Math.round(rect.left + window.scrollX), y: Math.round(rect.top + window.scrollY), width: Math.round(rect.width), height: Math.round(rect.height) }, center: { x: Math.round(rect.left + rect.width/2), y: Math.round(rect.top + rect.height/2) }, rect: scaledRect, zIndex: item.zIndex }; // Add context information for iframe or shadow DOM if applicable if (element._contextInfo) { elementData.context = {}; // Add iframe information if element is within an iframe if (element._contextInfo.iframe) { const iframeRect = element._contextInfo.iframe.getBoundingClientRect(); elementData.context.iframe = { id: element._contextInfo.iframe.id || null, name: element._contextInfo.iframe.name || null, src: element._contextInfo.iframe.src || null, rect: { x: Math.round(iframeRect.left), y: Math.round(iframeRect.top), width: Math.round(iframeRect.width), height: Math.round(iframeRect.height) } }; } // Add shadow DOM information if element is within a shadow DOM if (element._contextInfo.shadowHost) { const shadowHost = element._contextInfo.shadowHost; const shadowHostRect = shadowHost.getBoundingClientRect(); elementData.context.shadowDOM = { hostTagName: shadowHost.tagName.toLowerCase(), hostId: shadowHost.id || null, hostRect: { x: Math.round(shadowHostRect.left), y: Math.round(shadowHostRect.top), width: Math.round(shadowHostRect.width), height: Math.round(shadowHostRect.height) } }; } } result.elements.push(elementData); }); return result; } // Add new function to sort elements by position function sortElementsByPosition(elements) { // Define what "same row" means (elements within this Y-distance are considered in the same row) const ROW_THRESHOLD = 20; // pixels // First, group elements into rows based on their Y position const rows = []; let currentRow = []; // Copy elements to avoid modifying the original array const sortedByY = [...elements].sort((a, b) => { return a.rect.top - b.rect.top; }); // Group into rows sortedByY.forEach(element => { if (currentRow.length === 0) { // Start a new row currentRow.push(element); } else { // Check if this element is in the same row as the previous ones const lastElement = currentRow[currentRow.length - 1]; if (Math.abs(element.rect.top - lastElement.rect.top) <= ROW_THRESHOLD) { // Same row currentRow.push(element); } else { // New row rows.push([...currentRow]); currentRow = [element]; } } }); // Add the last row if not empty if (currentRow.length > 0) { rows.push(currentRow); } // Sort each row by X position (left to right) rows.forEach(row => { row.sort((a, b) => a.rect.left - b.rect.left); }); // Flatten the rows back into a single array return rows.flat(); } // Execute and measure performance console.time('getInteractiveElements'); const result = getInteractiveElementsData(); console.timeEnd('getInteractiveElements'); console.timeEnd('totalExecutionTime'); return result; }; ``` ## /index/browser/fonts/OpenSans-Medium.ttf Binary file available at https://raw.githubusercontent.com/lmnr-ai/index/refs/heads/main/index/browser/fonts/OpenSans-Medium.ttf ## /index/browser/models.py ```py path="/index/browser/models.py" from dataclasses import dataclass, field from typing import Optional from pydantic import BaseModel, ConfigDict from pydantic.alias_generators import to_camel # Pydantic class TabInfo(BaseModel): """Represents information about a browser tab""" page_id: int url: str title: str class Coordinates(BaseModel): x: int y: int width: Optional[int] = None height: Optional[int] = None class Rect(BaseModel): left: int top: int right: int bottom: int width: int height: int class InteractiveElement(BaseModel): """Represents an interactive element on the page""" model_config = ConfigDict( alias_generator=to_camel, populate_by_name=True, from_attributes=True, ) index: int tag_name: str text: str attributes: dict[str, str] viewport: Coordinates page: Coordinates center: Coordinates weight: float browser_agent_id: str input_type: Optional[str] = field(default=None) rect: Rect z_index: int class BrowserError(Exception): """Base class for all browser errors""" class URLNotAllowedError(BrowserError): """Error raised when a URL is not allowed""" class Viewport(BaseModel): """Represents the viewport of the browser""" model_config = ConfigDict( alias_generator=to_camel, populate_by_name=True, from_attributes=True, ) width: int = field(default_factory=lambda: 1024) height: int = field(default_factory=lambda: 768) scroll_x: int = field(default_factory=lambda: 0) scroll_y: int = field(default_factory=lambda: 0) device_pixel_ratio: float = field(default_factory=lambda: 1) scroll_distance_above_viewport: int = field(default_factory=lambda: 0) scroll_distance_below_viewport: int = field(default_factory=lambda: 0) class InteractiveElementsData(BaseModel): """Represents the data returned by the interactive elements script""" viewport: Viewport elements: list[InteractiveElement] @dataclass class BrowserState: url: str tabs: list[TabInfo] viewport: Viewport = field(default_factory=Viewport) screenshot_with_highlights: Optional[str] = None screenshot: Optional[str] = None interactive_elements: dict[int, InteractiveElement] = field(default_factory=dict) ``` ## /index/browser/utils.py ```py path="/index/browser/utils.py" import base64 import logging from io import BytesIO from pathlib import Path from typing import List from PIL import Image, ImageDraw, ImageFont from index.browser.models import InteractiveElement, Rect logger = logging.getLogger(__name__) def put_highlight_elements_on_screenshot(elements: dict[int, InteractiveElement], screenshot_b64: str) -> str: """Highlight elements using Pillow instead of OpenCV""" try: # Decode base64 to PIL Image image_data = base64.b64decode(screenshot_b64) image = Image.open(BytesIO(image_data)) draw = ImageDraw.Draw(image) # Colors (RGB format for PIL) base_colors = [ (204, 0, 0), (0, 136, 0), (0, 0, 204), (204, 112, 0), (102, 0, 102), (0, 102, 102), (204, 51, 153), (44, 0, 102), (204, 35, 0), (28, 102, 66), (170, 0, 0), (36, 82, 123) ] placed_labels = [] def generate_unique_color(base_color, element_idx): """Generate a unique color variation based on element index""" r, g, b = base_color # Use prime numbers to create deterministic but non-repeating patterns offset_r = (element_idx * 17) % 31 - 15 # Range: -15 to 15 offset_g = (element_idx * 23) % 29 - 14 # Range: -14 to 14 offset_b = (element_idx * 13) % 27 - 13 # Range: -13 to 13 # Ensure RGB values stay within 0-255 range r = max(0, min(255, r + offset_r)) g = max(0, min(255, g + offset_g)) b = max(0, min(255, b + offset_b)) return (r, g, b) # Load custom font from the package try: # Path to your packaged font font_path = Path(__file__).parent / "fonts" / "OpenSans-Medium.ttf" font = ImageFont.truetype(str(font_path), 11) except Exception as e: logger.warning(f"Could not load custom font: {e}, falling back to default") font = ImageFont.load_default() for idx, element in elements.items(): # don't draw sheets elements if element.browser_agent_id.startswith("row_") or element.browser_agent_id.startswith("column_"): continue base_color = base_colors[idx % len(base_colors)] color = generate_unique_color(base_color, idx) rect = element.rect # Draw rectangle draw.rectangle( [(rect.left, rect.top), (rect.right, rect.bottom)], outline=color, width=2 ) # Prepare label text = str(idx) # Get precise text dimensions for proper centering text_bbox = draw.textbbox((0, 0), text, font=font) text_width = text_bbox[2] - text_bbox[0] text_height = text_bbox[3] - text_bbox[1] # Make label size exactly proportional for better aesthetics label_width = text_width + 4 label_height = text_height + 4 # Positioning logic if label_width > rect.width or label_height > rect.height: label_x = rect.left + rect.width label_y = rect.top else: label_x = rect.left + rect.width - label_width label_y = rect.top # Check for overlaps with existing labels label_rect = { 'left': label_x, 'top': label_y, 'right': label_x + label_width, 'bottom': label_y + label_height } for existing in placed_labels: if not (label_rect['right'] < existing['left'] or label_rect['left'] > existing['right'] or label_rect['bottom'] < existing['top'] or label_rect['top'] > existing['bottom']): label_y = existing['bottom'] + 2 label_rect['top'] = label_y label_rect['bottom'] = label_y + label_height break # Ensure label is visible within image boundaries img_width, img_height = image.size if label_x < 0: label_x = 0 elif label_x + label_width >= img_width: label_x = img_width - label_width - 1 if label_y < 0: label_y = 0 elif label_y + label_height >= img_height: label_y = img_height - label_height - 1 # Draw label background draw.rectangle( [(label_x, label_y), (label_x + label_width, label_y + label_height)], fill=color ) # magic numbers to center the text text_x = label_x + 3 text_y = label_y - 1 # Draw text draw.text( (text_x, text_y), text, fill=(255, 255, 255), font=font ) placed_labels.append(label_rect) # Convert back to base64 buffer = BytesIO() image.save(buffer, format="PNG") new_image_base64 = base64.b64encode(buffer.getvalue()).decode() return new_image_base64 except Exception as e: logger.error(f"Failed to add highlights to screenshot: {str(e)}") return screenshot_b64 def scale_b64_image(image_b64: str, scale_factor: float) -> str: """ Scale down a base64 encoded image using Pillow. Args: image_b64: Base64 encoded image string scale_factor: Factor to scale the image by (0.5 = half size) Returns: Base64 encoded scaled image """ try: # Decode base64 to PIL Image image_data = base64.b64decode(image_b64) image = Image.open(BytesIO(image_data)) if image is None: return image_b64 # Get original dimensions width, height = image.size # Calculate new dimensions new_width = int(width * scale_factor) new_height = int(height * scale_factor) # Resize the image using high quality resampling resized_image = image.resize( (new_width, new_height), Image.LANCZOS ) # Convert back to base64 buffer = BytesIO() resized_image.save(buffer, format="PNG") resized_image_b64 = base64.b64encode(buffer.getvalue()).decode() return resized_image_b64 except Exception: return image_b64 def calculate_iou(rect1: Rect, rect2: Rect) -> float: """ Calculate Intersection over Union between two rectangles. Args: rect1: First rectangle with left, top, right, bottom keys rect2: Second rectangle with left, top, right, bottom keys Returns: IoU value """ # Calculate intersection intersect_left = max(rect1.left, rect2.left) intersect_top = max(rect1.top, rect2.top) intersect_right = min(rect1.right, rect2.right) intersect_bottom = min(rect1.bottom, rect2.bottom) # Check if intersection exists if intersect_right < intersect_left or intersect_bottom < intersect_top: return 0.0 # No intersection # Calculate area of each rectangle area1 = (rect1.right - rect1.left) * (rect1.bottom - rect1.top) area2 = (rect2.right - rect2.left) * (rect2.bottom - rect2.top) # Calculate area of intersection intersection_area = (intersect_right - intersect_left) * (intersect_bottom - intersect_top) # Calculate union area union_area = area1 + area2 - intersection_area # Calculate IoU return intersection_area / union_area if union_area > 0 else 0.0 def is_fully_contained(rect1: Rect, rect2: Rect) -> bool: """ Check if rect1 is fully contained within rect2. Args: rect1: First rectangle with left, top, right, bottom keys rect2: Second rectangle with left, top, right, bottom keys Returns: True if rect1 is fully contained within rect2 """ return (rect1.left >= rect2.left and rect1.right <= rect2.right and rect1.top >= rect2.top and rect1.bottom <= rect2.bottom) def filter_overlapping_elements(elements: List[InteractiveElement], iou_threshold: float = 0.7) -> List[InteractiveElement]: """ Filter overlapping elements using weight and IoU. Args: elements: Elements to filter iou_threshold: Threshold for considering elements as overlapping Returns: Filtered elements """ if not elements: return [] # Sort by area (descending), then by weight (descending) elements.sort(key=lambda e: ( -(e.rect.width * e.rect.height), # Negative area for descending sort -e.weight # Negative weight for descending sort )) filtered_elements: List[InteractiveElement] = [] # Add elements one by one, checking against already added elements for current in elements: should_add = True # For each element already in our filtered list for existing in filtered_elements: # Check overlap with IoU iou = calculate_iou(current.rect, existing.rect) if iou > iou_threshold: should_add = False break # Check if current element is fully contained within an existing element with higher weight if is_fully_contained(current.rect, existing.rect): if existing.weight >= current.weight and existing.z_index == current.z_index: should_add = False break else: # If current element has higher weight and is more than 50% of the size of the existing element, remove the existing element if current.rect.width * current.rect.height >= existing.rect.width * existing.rect.height * 0.5: filtered_elements.remove(existing) break if should_add: filtered_elements.append(current) return filtered_elements def sort_elements_by_position(elements: List[InteractiveElement]) -> List[InteractiveElement]: """ Sort elements by position (top to bottom, left to right). Args: elements: Elements to sort Returns: Sorted elements """ if not elements: return [] # Define what "same row" means ROW_THRESHOLD = 20 # pixels # First, group elements into rows based on Y position rows = [] current_row = [] # Copy and sort elements by Y position sorted_by_y = sorted(elements, key=lambda e: e.rect.top) # Group into rows for element in sorted_by_y: if not current_row: # Start a new row current_row.append(element) else: # Check if this element is in the same row as the previous ones last_element = current_row[-1] if abs(element.rect.top - last_element.rect.top) <= ROW_THRESHOLD: # Same row current_row.append(element) else: # New row rows.append(list(current_row)) current_row = [element] # Add the last row if not empty if current_row: rows.append(current_row) # Sort each row by X position (left to right) for row in rows: row.sort(key=lambda e: e.rect.left) # Flatten the rows back into a single array elements = [element for row in rows for element in row] for i, element in enumerate(elements): element.index = i return elements def filter_elements( elements: List[InteractiveElement], iou_threshold: float = 0.7 ) -> List[InteractiveElement]: """ Combine interactive elements from multiple detection methods and filter duplicates. Args: elements: Interactive elements from multiple detection methods iou_threshold: Threshold for considering elements as overlapping Returns: Combined and filtered elements """ #Filter overlapping elements filtered = filter_overlapping_elements(elements, iou_threshold) # Sort elements by position sorted_elements = sort_elements_by_position(filtered) return sorted_elements ``` ## /index/cli.py ```py path="/index/cli.py" #!/usr/bin/env python import asyncio import json import logging import os import subprocess import time from typing import Dict, List, Optional import requests import typer from dotenv import load_dotenv from rich.console import Console from rich.logging import RichHandler from rich.markdown import Markdown from rich.panel import Panel from rich.prompt import Prompt from textual.app import App from textual.containers import Container, Horizontal, Vertical from textual.reactive import reactive from textual.widgets import Button, Footer, Header, Input, Static from index.agent.agent import Agent from index.agent.models import AgentOutput, AgentState from index.browser.browser import BrowserConfig from index.llm.llm import BaseLLMProvider from index.llm.providers.anthropic import AnthropicProvider from index.llm.providers.gemini import GeminiProvider from index.llm.providers.openai import OpenAIProvider load_dotenv() # Create Typer app app = typer.Typer(help="Index - Browser AI agent CLI") # Configuration constants BROWSER_STATE_FILE = "browser_state.json" DEFAULT_CHROME_PATH = "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome" DEFAULT_DEBUGGING_PORT = 9222 console = Console() def setup_logging(debug: bool = False): """Configure logging based on debug flag""" log_level = logging.INFO if debug else logging.WARNING # Configure root logger logging.basicConfig( level=log_level, format="%(message)s", datefmt="[%X]", handlers=[RichHandler(rich_tracebacks=True, console=console)] ) # Set specific logger levels logging.getLogger("index").setLevel(log_level) logging.getLogger("playwright").setLevel(logging.WARNING) # Always keep playwright at WARNING if debug: console.print("[yellow]Debug mode enabled - logging set to INFO level[/]") class AgentSession: """Manages an agent session with state persistence""" def __init__(self, llm: Optional[BaseLLMProvider] = None, use_local_chrome: bool = False, chrome_path: str = DEFAULT_CHROME_PATH, debugging_port: int = DEFAULT_DEBUGGING_PORT, debug: bool = False): self.llm = llm self.chrome_process = None self.use_local_chrome = use_local_chrome self.chrome_path = chrome_path self.debugging_port = debugging_port self.logger = logging.getLogger("index.agent_session") browser_config = None if os.path.exists(BROWSER_STATE_FILE) and not use_local_chrome: with open(BROWSER_STATE_FILE, "r") as f: self.storage_state = json.load(f) console.print("[green]Loaded existing browser state[/green]") browser_config = BrowserConfig( storage_state=self.storage_state, viewport_size={ "width": 1200, "height": 800 } ) else: if use_local_chrome: # Launch Chrome and connect to it self._launch_local_chrome() browser_config = BrowserConfig( cdp_url="http://localhost:" + str(self.debugging_port), ) else: browser_config = BrowserConfig( viewport_size={ "width": 1200, "height": 800 } ) self.agent = Agent(llm=self.llm, browser_config=browser_config) self.agent_state: Optional[str] = None self.step_count: int = 0 self.action_results: List[Dict] = [] self.is_running: bool = False self.storage_state: Optional[Dict] = None def _launch_local_chrome(self): """Launch a local Chrome instance with remote debugging enabled""" # Check if Chrome is already running with the specified debugging port try: response = requests.get(f"http://localhost:{self.debugging_port}/json/version", timeout=2) if response.status_code == 200: console.print(f"[green]Connected to already running Chrome instance on port {self.debugging_port}[/green]") self.logger.info(f"Connected to existing Chrome instance on port {self.debugging_port}") return except requests.RequestException: # No running Chrome instance found on the specified port, proceed with launching a new one pass console.print(f"[blue]Launching Chrome from {self.chrome_path} with debugging port {self.debugging_port}[/blue]") try: self.chrome_process = subprocess.Popen( [self.chrome_path, f"--remote-debugging-port={self.debugging_port}", "--no-first-run", "--no-default-browser-check"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) console.print("[green]Chrome launched successfully[/green]") self.logger.info(f"Chrome process started with PID {self.chrome_process.pid}") # Give Chrome time to start up time.sleep(2) except Exception as e: self.logger.error(f"Failed to launch Chrome: {str(e)}") console.print(f"[red]Failed to launch Chrome: {str(e)}[/red]") raise def save_state(self, agent_output: AgentOutput): """Save agent state to file""" if agent_output.storage_state: with open(BROWSER_STATE_FILE, "w") as f: json.dump(agent_output.storage_state, f) self.logger.info("Agent state saved to file") console.print("[green]Saved agent state[/green]") async def run_agent(self, prompt: str) -> AgentOutput: """Run the agent with the given prompt""" self.is_running = True self.logger.info(f"Running agent with prompt: {prompt}") try: # Run the agent if self.agent_state: result = await self.agent.run( prompt=prompt, agent_state=self.agent_state, close_context=False, return_storage_state=True, return_agent_state=True ) else: result = await self.agent.run( prompt=prompt, close_context=False, return_storage_state=True, return_agent_state=True ) self.step_count = result.step_count self.agent_state = result.agent_state.model_dump_json() self.save_state(result) return result finally: self.is_running = False async def stream_run(self, prompt: str): """Run the agent with streaming output""" self.is_running = True self.logger.info(f"Running agent with streaming and prompt: {prompt}") try: # Run the agent with streaming if self.agent_state: stream = self.agent.run_stream( prompt=prompt, agent_state=self.agent_state, close_context=False, max_steps=500, # large number to allow the agent to run for a long time return_agent_state=True, return_storage_state=True ) else: stream = self.agent.run_stream( prompt=prompt, close_context=False, max_steps=500, # large number to allow the agent to run for a long time return_agent_state=True, return_storage_state=True ) final_output = None async for chunk in stream: # Directly yield the raw chunk without any modifications yield chunk # Store final output for state saving if chunk.type == "final_output": final_output = chunk.content if final_output: self.step_count = final_output.step_count self.agent_state = final_output.agent_state.model_dump_json() self.save_state(final_output) finally: self.is_running = False def reset(self): """Reset agent state""" if os.path.exists(BROWSER_STATE_FILE): os.remove(BROWSER_STATE_FILE) self.agent_state = None self.step_count = 0 self.action_results = [] self.logger.info("Agent state reset") console.print("[yellow]Agent state reset[/yellow]") async def close(self): """Close the agent and any associated resources""" # Close the browser instance if self.agent and self.agent.browser: self.logger.info("Closing browser instance") await self.agent.browser.close() # Terminate Chrome process if launched locally if self.chrome_process: self.logger.info(f"Terminating Chrome process with PID {self.chrome_process.pid}") console.print("[yellow]Terminating local Chrome instance...[/yellow]") self.chrome_process.terminate() self.chrome_process = None class AgentUI(App): """Textual-based UI for interacting with the agent""" CSS = """ Header { background: #3b82f6; color: white; text-align: center; padding: 1; } Footer { background: #1e3a8a; color: white; text-align: center; padding: 1; } #prompt-input { padding: 1 2; border: tall $accent; margin: 1 1; height: 3; } #output-container { height: 1fr; border: solid #ccc; background: #f8fafc; padding: 1; margin: 0 1; overflow-y: auto; } #action-results { height: 15; border: solid #ccc; background: #f8fafc; margin: 0 1 1 1; overflow-y: auto; } .action-result { border: solid #e5e7eb; margin: 1 0; padding: 1; } .action-title { color: #3b82f6; text-style: bold; } .action-content { margin-top: 1; } Button { margin: 1 1; } #buttons-container { height: auto; align: center middle; } .running { color: #f97316; text-style: bold; } .completed { color: #22c55e; text-style: bold; } .error { color: #ef4444; text-style: bold; } """ TITLE = "Index Browser Agent CLI" BINDINGS = [ ("q", "quit", "Quit"), ("r", "reset", "Reset Agent"), ("ctrl+s", "send", "Send Message"), ] agent_session = None status = reactive("Ready") def compose(self): yield Header() with Vertical(): with Container(id="output-container"): yield Static(id="output", expand=True) with Container(id="action-results"): yield Static(id="results", expand=True) with Horizontal(id="buttons-container"): yield Button("Send", id="send-btn", variant="primary") yield Button("Reset", id="reset-btn", variant="error") yield Input(placeholder="Enter your task or follow-up message...", id="prompt-input") yield Footer() def update_output(self): """Update the output display""" output = "" if self.agent_session.agent_state: state = AgentState.model_validate_json(self.agent_session.agent_state) # Get the latest user and assistant messages user_msgs = [m for m in state.messages if m.role == "user"] assistant_msgs = [m for m in state.messages if m.role == "assistant"] if user_msgs: latest_user = user_msgs[-1] output += f"[bold blue]User:[/] {latest_user.content}\n\n" if assistant_msgs: latest_assistant = assistant_msgs[-1] output += f"[bold green]Assistant:[/] {latest_assistant.content}\n\n" output += f"[dim]Steps completed: {self.agent_session.step_count}[/]\n" output += f"[dim]Status: {self.status}[/]\n" else: output = "[italic]No previous session. Start by sending a task.[/]" self.query_one("#output", Static).update(Markdown(output)) # Update action results if self.agent_session.action_results: results_output = "" for i, result in enumerate(reversed(self.agent_session.action_results[-5:])): action_type = result.get("type", "unknown") content = result.get("content", {}) if action_type == "step": action_result = content.get("action_result", {}) summary = content.get("summary", "No summary available") results_output += f"[bold]Step {i+1}[/]\n" results_output += f"Summary: {summary}\n" if action_result.get("is_done"): results_output += "[green]Task completed[/]\n" if action_result.get("give_control"): results_output += "[yellow]Agent requested human control[/]\n" results_output += f"Message: {action_result.get('content', '')}\n" results_output += "\n" elif action_type == "error": results_output += "[bold red]Error[/]\n" results_output += f"{content}\n\n" self.query_one("#results", Static).update(Markdown(results_output)) async def on_button_pressed(self, event: Button.Pressed): """Handle button presses""" if event.button.id == "send-btn": await self.action_send() elif event.button.id == "reset-btn": self.action_reset() def action_reset(self): """Reset the agent state""" self.agent_session.reset() self.agent_session.action_results = [] self.update_output() async def action_send(self): """Send the current prompt to the agent""" prompt = self.query_one("#prompt-input", Input).value if not prompt.strip(): return self.status = "Running..." self.query_one("#prompt-input", Input).value = "" self.update_output() try: # Stream the results to provide real-time feedback async for chunk in self.agent_session.stream_run(prompt): self.agent_session.action_results.append(chunk) self.update_output() await asyncio.sleep(0.1) # Small delay to ensure UI updates self.status = "Ready" except Exception as e: self.status = f"Error: {str(e)}" finally: self.update_output() async def on_mount(self): """Called when the app is mounted""" # Register cleanup handler self.set_interval(0.1, self._check_exit) async def _check_exit(self): """Check if app is exiting and clean up resources""" if self.exiting: if self.agent_session: await self.agent_session.close() def action_quit(self): """Quit the application""" self.exit() @app.command() def run( prompt: str = typer.Option(None, "--prompt", "-p", help="Initial prompt to send to the agent"), use_local_chrome: bool = typer.Option(False, "--local-chrome", help="Use local Chrome instance instead of launching a new browser"), chrome_path: str = typer.Option(DEFAULT_CHROME_PATH, "--chrome-path", help="Path to Chrome executable"), debugging_port: int = typer.Option(DEFAULT_DEBUGGING_PORT, "--port", help="Remote debugging port for Chrome"), debug: bool = typer.Option(False, "--debug", help="Enable debug logging") ): """ Launch the interactive loop for the Index browser agent """ # Set up logging if debug mode is enabled setup_logging(debug) asyncio.run(_interactive_loop( initial_prompt=prompt, use_local_chrome=use_local_chrome, chrome_path=chrome_path, debugging_port=debugging_port, debug=debug )) @app.command(name="ui") def run_ui( prompt: str = typer.Option(None, "--prompt", "-p", help="Initial prompt to send to the agent"), use_local_chrome: bool = typer.Option(False, "--local-chrome", help="Use local Chrome instance instead of launching a new browser"), chrome_path: str = typer.Option(DEFAULT_CHROME_PATH, "--chrome-path", help="Path to Chrome executable"), debugging_port: int = typer.Option(DEFAULT_DEBUGGING_PORT, "--port", help="Remote debugging port for Chrome"), debug: bool = typer.Option(False, "--debug", help="Enable debug logging") ): """ Launch the graphical UI for the Index browser agent """ # Set up logging if debug mode is enabled setup_logging(debug) # Select model and check API key llm_provider = select_model_and_check_key() # Initialize UI with the selected LLM provider agent_ui = AgentUI() agent_ui.agent_session = AgentSession( llm=llm_provider, use_local_chrome=use_local_chrome, chrome_path=chrome_path, debugging_port=debugging_port, debug=debug ) if prompt: # If a prompt is provided, we'll send it once the UI is ready async def send_initial_prompt(): await asyncio.sleep(0.5) # Give UI time to initialize agent_ui.query_one("#prompt-input", Input).value = prompt await agent_ui.action_send() agent_ui.set_interval(0.1, lambda: asyncio.create_task(send_initial_prompt())) agent_ui.run() def create_llm_provider(provider: str, model: str) -> BaseLLMProvider: """Create an LLM provider based on model choice""" if provider == "openai": # OpenAI model console.print(f"[cyan]Using OpenAI model: {model}[/]") return OpenAIProvider(model=model, reasoning_effort="low") elif provider == "gemini": # Gemini model if model == "gemini-2.5-pro-preview-03-25": console.print(f"[cyan]Using Gemini model: {model}[/]") return GeminiProvider( model=model, thinking_token_budget=8192 ) elif model == "gemini-2.5-flash-preview-04-17": console.print(f"[cyan]Using Gemini model: {model}[/]") return GeminiProvider( model=model, thinking_token_budget=8192 ) else: raise ValueError(f"Unsupported Gemini model: {model}") elif provider == "anthropic": # Anthropic model console.print(f"[cyan]Using Anthropic model: {model}[/]") return AnthropicProvider( model=model, enable_thinking=True, thinking_token_budget=2048 ) else: raise ValueError(f"Unsupported provider: {provider}") def check_and_save_api_key(required_key: str): """Check if API key exists, prompt for it if missing, and save to .env file""" if not os.environ.get(required_key): console.print(f"\n[yellow]API key {required_key} not found in environment.[/]") api_key = Prompt.ask(f"Enter your {required_key}", password=True) # Save to .env file env_path = ".env" if os.path.exists(env_path): # Read existing content with open(env_path, "r") as f: env_content = f.read() env_content += f"\n{required_key}={api_key}" with open(env_path, "w") as f: f.write(env_content) console.print(f"[green]Saved {required_key} to .env file[/]") else: # Create new .env file with open(env_path, "w") as f: f.write(f"{required_key}={api_key}") console.print("[green]Created .env file[/]") # Update environment variable for current session os.environ[required_key] = api_key # Reload dotenv to ensure changes are applied load_dotenv(override=True) def select_model_and_check_key(): """Select a model and check for required API key""" console.print("\n[bold green]Choose an LLM model:[/]") console.print("1. [bold]Gemini 2.5 Pro[/]") console.print("2. [bold]Gemini 2.5 Flash[/]") console.print("3. [bold]Claude 3.7 Sonnet[/]") console.print("4. [bold]OpenAI o4-mini[/]") choice = Prompt.ask( "[bold]Select model[/]", choices=["1", "2", "3", "4"], default="1" ) provider = "" model = "" required_key = "" # Create LLM provider based on selection if choice == "1": provider = "gemini" model = "gemini-2.5-pro-preview-03-25" required_key = "GEMINI_API_KEY" elif choice == "2": provider = "gemini" model = "gemini-2.5-flash-preview-04-17" required_key = "GEMINI_API_KEY" elif choice == "3": provider = "anthropic" model = "claude-3-7-sonnet-20250219" required_key = "ANTHROPIC_API_KEY" elif choice == "4": provider = "openai" model = "o4-mini" required_key = "OPENAI_API_KEY" else: raise ValueError(f"Invalid choice: {choice}") # Check and save API key if needed check_and_save_api_key(required_key) return create_llm_provider(provider, model) async def _interactive_loop(initial_prompt: str = None, use_local_chrome: bool = False, chrome_path: str = DEFAULT_CHROME_PATH, debugging_port: int = DEFAULT_DEBUGGING_PORT, debug: bool = False): """Implementation of the interactive loop mode""" # Display welcome panel console.print(Panel.fit( "Index Browser Agent Interactive Mode\n" "Type your message and press Enter. The agent will respond.\n" "Press Ctrl+C to exit.", title="Interactive Mode", border_style="blue" )) # Select model and check API key llm_provider = select_model_and_check_key() # Create agent session with selected provider session = AgentSession( llm=llm_provider, use_local_chrome=use_local_chrome, chrome_path=chrome_path, debugging_port=debugging_port, debug=debug ) try: first_message = True awaiting_human_input = False while True: # Check if we're waiting for the user to return control to the agent if awaiting_human_input: console.print("\n[yellow]Agent is waiting for control to be returned.[/]") console.print("[yellow]Press Enter to return control to the agent...[/]", end="") input() # Wait for Enter key user_message = "Returning control back, continue your task" console.print(f"\n[bold blue]Your message:[/] {user_message}") awaiting_human_input = False # Normal message input flow elif first_message and initial_prompt: user_message = initial_prompt console.print(f"\n[bold blue]Your message:[/] {user_message}") first_message = False else: console.print("\n[bold blue]Your message:[/] ", end="") user_message = input() first_message = False if not user_message.strip(): continue console.print("\n[bold cyan]Agent is working...[/]") step_num = 1 human_control_requested = False # Run the agent with streaming output try: async for chunk in session.stream_run(user_message): if chunk.type == "step": action_result = chunk.content.action_result summary = chunk.content.summary # Simple single-line output for steps console.print(f"[bold blue]Step {step_num}:[/] {summary}") # Display additional info for special actions as separate lines if action_result and action_result.is_done and not action_result.give_control: console.print(" [green bold]✓ Task completed successfully![/]") if action_result and action_result.give_control: human_control_requested = True message = action_result.content or "No message provided" console.print(" [yellow bold]⚠ Human control requested:[/]") console.print(f" [yellow]{message}[/]") # Increment step counter for next step step_num += 1 elif chunk.type == "step_error": console.print(f"[bold red]Error:[/] {chunk.content}") elif chunk.type == "final_output": # Keep panel for final output result_content = chunk.content.result.content if chunk.content.result else "No result content" console.print(Panel( f"{result_content}", title="Final Output", border_style="green", expand=False )) except Exception as e: console.print(f"[bold red]Error:[/] {str(e)}") console.print(f"[dim]Type: {type(e)}[/]") console.print_exception() # After agent completes if human_control_requested: console.print("\n[yellow]Agent has requested human control.[/]") awaiting_human_input = True else: console.print("\n[green]Agent has completed the task.[/]") console.print("[dim]Waiting for your next message...[/]") except KeyboardInterrupt: console.print("\n[yellow]Exiting interactive mode...[/]") # Close the browser before exiting await session.close() def main(): """Entry point for the CLI""" app() if __name__ == "__main__": main() ``` ## /index/controller/controller.py ```py path="/index/controller/controller.py" import inspect import json import logging from dataclasses import dataclass from functools import wraps from typing import Any, Callable, Dict, List, Optional, Type, get_type_hints from lmnr import Laminar from pydantic import BaseModel from index.agent.models import ActionModel, ActionResult from index.browser.browser import Browser from index.controller.default_actions import register_default_actions logger = logging.getLogger(__name__) @dataclass class Action: """Represents a registered action""" name: str description: str function: Callable browser_context: bool = False class Controller: """Controller for browser actions with integrated registry functionality""" def __init__( self, exclude_actions: List[str] = None, output_model: Optional[Type[BaseModel]] = None, ): self.exclude_actions = exclude_actions or [] self.output_model = output_model self._actions: Dict[str, Action] = {} # Register default actions register_default_actions(self, self.output_model) def action(self, description: str = None): """ Decorator for registering actions Args: description: Optional description of what the action does. If not provided, uses the function's docstring. """ def decorator(func: Callable) -> Callable: if func.__name__ in self.exclude_actions: return func # Use provided description or function docstring action_description = description if action_description is None: action_description = inspect.getdoc(func) or "No description provided" # Clean up docstring (remove indentation) action_description = inspect.cleandoc(action_description) browser_context = False if 'browser' in inspect.signature(func).parameters: browser_context = True @wraps(func) async def async_wrapper(*args, **kwargs): return await func(*args, **kwargs) # Register the action self._actions[func.__name__] = Action( name=func.__name__, description=action_description, function=async_wrapper, browser_context=browser_context, ) return func return decorator async def execute_action( self, action: ActionModel, browser: Browser, ) -> ActionResult: """Execute an action from an ActionModel""" action_name = action.name params = action.params if params is not None: with Laminar.start_as_current_span( name=action_name, input={ 'action': action_name, 'params': params, }, span_type='TOOL', ): logger.info(f'Executing action: {action_name} with params: {params}') action = self._actions.get(action_name) if action is None: raise ValueError(f'Action {action_name} not found') try: kwargs = params.copy() if params else {} # Add browser to kwargs if it's provided if action.browser_context and browser is not None: kwargs['browser'] = browser result = await action.function(**kwargs) Laminar.set_span_output(result) return result except Exception as e: raise RuntimeError(f'Error executing action {action_name}: {str(e)}') from e else: raise ValueError('Params are not provided for action: {action_name}') def get_action_descriptions(self) -> str: """Return a dictionary of all registered actions and their metadata""" action_info = [] for name, action in self._actions.items(): sig = inspect.signature(action.function) type_hints = get_type_hints(action.function) # Build parameter info params = {} for param_name in sig.parameters.keys(): if param_name == 'browser': # Skip browser parameter in descriptions continue param_type = type_hints.get(param_name, Any).__name__ params[param_name] = { 'type': param_type, } action_info.append(json.dumps({ 'name': name, 'description': action.description, 'parameters': params }, indent=2)) return '\n\n'.join(action_info) ``` ## /index/controller/default_actions.py ```py path="/index/controller/default_actions.py" import asyncio import json import logging import platform import re from tenacity import retry, stop_after_attempt, wait_exponential from index.agent.models import ActionResult from index.browser.browser import Browser logger = logging.getLogger(__name__) def register_default_actions(controller, output_model=None): """Register all default browser actions to the provided controller""" @controller.action('Complete task') async def done(text: str): return ActionResult(is_done=True, content=text) @controller.action() async def give_human_control(message: str, browser: Browser): """Give human control of the browser. Use this action when you need to use user information, such as first name, last name, email, phone number, booking information, login/password, etc. to proceed with the task. Also, if you can't solve the CAPTCHA, use this action. Args: message: Message to give to the human, explaining why you need human intervention. """ return ActionResult(give_control=True, content=message, is_done=True) @controller.action() async def search_google(query: str, browser: Browser): """ Open google search in new tab and search for the query. """ page = await browser.get_current_page() await page.goto(f'https://www.google.com/search?q={query}&udm=14') await page.wait_for_load_state() msg = f"Searched for '{query}' in Google" logger.info(msg) return ActionResult(content=msg) @controller.action() @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10), reraise=True, before_sleep=lambda retry_state: logger.warning( f"Retrying step after error: {retry_state.outcome.exception()}. Attempt {retry_state.attempt_number}" ) ) async def go_to_url(url: str, browser: Browser): """Navigate to URL in the current tab""" page = await browser.get_current_page() await page.goto(url, wait_until='domcontentloaded') await asyncio.sleep(1.5) msg = f"Navigated to {url}" logger.info(msg) return ActionResult(content=msg) @controller.action() async def go_back_to_previous_page(browser: Browser): """Go back to the previous page""" try: page = await browser.get_current_page() await page.go_back(wait_until='domcontentloaded') await asyncio.sleep(2) msg = 'Navigated back to the previous page' logger.info(msg) return ActionResult(content=msg) except Exception as e: logger.debug(f'During go_back: {e}') return ActionResult(error=str(e)) @controller.action() async def click_on_spreadsheet_cell(row: str, column: str, browser: Browser) -> ActionResult: """Click on a spreadsheet cell at a specific row and column. You HAVE to use this action when you need to click on a cell in a spreadsheet. DON'T try to use click_element action, it will not work. Args: row: Row of the cell to click on, it should be a number formatted as a string. e.g. "1" column: Column of the cell to click on, it should be a letter formatted as a string. e.g. "A" """ page = await browser.get_current_page() state = browser.get_state() elements = state.interactive_elements.values() row_element = next((e for e in elements if e.browser_agent_id == f"row_{row}"), None) column_element = next((e for e in elements if e.browser_agent_id == f"column_{column}"), None) if not row_element or not column_element: return ActionResult(error='Row or column element not found - pay close attention to the row and column numbers.') # reseting click just in case await page.mouse.click(state.viewport.width / 2, state.viewport.height / 2) await asyncio.sleep(0.05) await page.mouse.click(column_element.center.x, row_element.center.y, click_count=2) await asyncio.sleep(0.05) return ActionResult(content=f'Clicked on spreadsheet cell with row {row} and column {column}') @controller.action() async def click_element(index: int, browser: Browser, wait_after_click: bool = False): """ Click on the element with index. Args: index: Index of the element to click on. wait_after_click: If True, wait for 2 second after clicking the element. Only set it to True when you think that clicking will trigger loading state, for instance navigation to new page, search, loading of a content, etc. """ # clean index if it contains any non-numeric characters cleaned_index_str = re.sub(r'\D', '', str(index)) if cleaned_index_str == '': logger.error(f'Index is not a number. Index: {index}') return ActionResult(error="`index` should be a valid number.") index = int(cleaned_index_str) state = browser.get_state() if index not in state.interactive_elements: return ActionResult(error=f"Element with index {index} does not exist - retry or use alternative actions.") element = state.interactive_elements[index] initial_pages = len(browser.context.pages) if browser.context else 0 try: page = await browser.get_current_page() await page.mouse.click(element.center.x, element.center.y) msg = f'Clicked element with index {index}: <{element.tag_name}>' logger.info(msg) if browser.context and len(browser.context.pages) > initial_pages: new_tab_msg = 'New tab opened - switching to it' msg += f' - {new_tab_msg}' logger.info(new_tab_msg) await browser.switch_to_tab(-1) if wait_after_click: await asyncio.sleep(2) return ActionResult(content=msg) except Exception as e: return ActionResult(error=str(e)) @controller.action( description='Use this action to wait for the page to load, if you see that the content on the clean screenshot is empty or loading UI elements such as skeleton screens. This action will wait for page to load. Then you can continue with your actions.', ) async def wait_for_page_to_load() -> ActionResult: return ActionResult(content='Waited for page to load') @controller.action() async def enter_text(text: str, press_enter: bool, browser: Browser): """Enter text with a keyboard. Use it AFTER you have clicked on an input element. This action will override the current text in the element. Args: text: Text to enter with a keyboard. press_enter: If True, `Enter` button will be pressed after entering the text. Use this when you think it would make sense to press `Enter` after entering the text, such as when you're submitting a form, performing a search, etc. """ try: page = await browser.get_current_page() # clear the element await page.keyboard.press("ControlOrMeta+a") await asyncio.sleep(0.1) await page.keyboard.press("Backspace") await asyncio.sleep(0.1) # input text into the element await page.keyboard.type(text) if press_enter: await page.keyboard.press("Enter") await asyncio.sleep(2) msg = f'Entered "{text}" on the keyboard. Make sure to double check that the text was entered to where you intended.' logger.info(msg) return ActionResult(content=msg) except Exception as e: return ActionResult(error=f'Failed to enter text. Error: {str(e)}') # Tab Management Actions @controller.action('Switch tab') async def switch_tab(page_id: int, browser: Browser): await browser.switch_to_tab(page_id) await asyncio.sleep(0.5) msg = f'Switched to tab {page_id}' logger.info(msg) return ActionResult(content=msg) @controller.action('Open url in new tab') async def open_tab(url: str, browser: Browser): await browser.create_new_tab(url) msg = f'Opened new tab with {url}' logger.info(msg) return ActionResult(content=msg) @controller.action( "Scrolls entire page down. Use this action when you want to scroll the entire page down. Don't use this action if you want to scroll over a specific scrollable area on a page." ) async def scroll_page_down(browser: Browser): page = await browser.get_current_page() state = browser.get_state() # move mouse to the center of the page await page.mouse.move(state.viewport.width / 2, state.viewport.height / 2) await asyncio.sleep(0.1) # scroll down by one page await page.mouse.wheel(0, state.viewport.height * 0.8) return ActionResult(content="Scrolled mouse wheel down (it doesn't guarantee that something has scrolled, you need to check new state screenshot to confirm)") @controller.action( "Scrolls entire page up. Use this action when you want to scroll the entire page up. Don't use this action if you want to scroll over a specific scrollable area on a page." ) async def scroll_page_up(browser: Browser): page = await browser.get_current_page() state = browser.get_state() # move mouse to the center of the page await page.mouse.move(state.viewport.width / 2, state.viewport.height / 2) await asyncio.sleep(0.1) # scroll up by one page await page.mouse.wheel(0, -state.viewport.height * 0.8) return ActionResult(content="Scrolled mouse wheel up (it doesn't guarantee that something has scrolled, you need to check new state screenshot to confirm)") @controller.action( "Moves mouse to the element with index `index`, located inside scrollable area of the webpage, identified by scrollbars. Then scrolls mouse wheel down." ) async def scroll_down_over_element(index: int, browser: Browser): page = await browser.get_current_page() state = browser.get_state() if index not in state.interactive_elements: return ActionResult(error=f'Element index {index} does not exist - retry or use alternative actions') element = state.interactive_elements[index] await page.mouse.move(element.center.x, element.center.y) await asyncio.sleep(0.1) await page.mouse.wheel(0, state.viewport.height / 3) return ActionResult(content=f"Move mouse to element with index {index} and scroll mouse wheel down. (It doesn't guarantee that something has scrolled, you need to check new state screenshot to confirm)") @controller.action( "Moves mouse to the element with index `index`, located inside scrollable area of the webpage, identified by scrollbars. Then scrolls mouse wheel up." ) async def scroll_up_over_element(index: int, browser: Browser): page = await browser.get_current_page() state = browser.get_state() if index not in state.interactive_elements: return ActionResult(error=f'Element index {index} does not exist - retry or use alternative actions') element = state.interactive_elements[index] await page.mouse.move(element.center.x, element.center.y) await asyncio.sleep(0.1) await page.mouse.wheel(0, -state.viewport.height / 3) return ActionResult(content=f"Move mouse to element with index {index} and scroll mouse wheel up. (It doesn't guarantee that something has scrolled, you need to check new state screenshot to confirm)") @controller.action( "Moves mouse at the location of the element with index `index`, which should be inside scrollable area of the webpage, identified by scrollbars. Then scrolls mouse wheel horizontally to the right." ) async def scroll_right_over_element(index: int, browser: Browser): page = await browser.get_current_page() state = browser.get_state() if index not in state.interactive_elements: return ActionResult(error=f'Element index {index} does not exist - retry or use an alternative action') element = state.interactive_elements[index] await page.mouse.move(element.center.x, element.center.y) await asyncio.sleep(0.1) await page.mouse.wheel(state.viewport.width / 3, 0) return ActionResult(content=f"Moved mouse to element with index {index} and scroll mouse wheel horizontally to the right. (It doesn't guarantee that something has scrolled, you need to check new state screenshot to confirm)") @controller.action( "Moves mouse at the location of the element with index `index`, which should be inside scrollable area of the webpage, identified by scrollbars. Then scrolls mouse wheel horizontally to the left." ) async def scroll_left_over_element(index: int, browser: Browser): page = await browser.get_current_page() state = browser.get_state() if index not in state.interactive_elements: return ActionResult(error=f'Element index {index} does not exist - retry or use an alternative action') element = state.interactive_elements[index] await page.mouse.move(element.center.x, element.center.y) await asyncio.sleep(0.1) await page.mouse.wheel(-state.viewport.width / 3, 0) return ActionResult(content=f"Moved mouse to element with index {index} and scroll mouse wheel horizontally to the left. (It doesn't guarantee that something has scrolled, you need to check new state screenshot to confirm)") @controller.action( 'Press enter key. Use this action when you need to submit a form or perform an action that requires pressing enter.' ) async def press_enter(browser: Browser): page = await browser.get_current_page() await page.keyboard.press('Enter') return ActionResult(content='Pressed enter key') @controller.action( 'Remove all text in the element with index.' ) async def clear_text_in_element(index: int, browser: Browser): page = await browser.get_current_page() state = browser.get_state() if index not in state.interactive_elements: return ActionResult(error=f'Element index {index} does not exist - retry or use alternative actions') element = state.interactive_elements[index] await page.mouse.move(element.center.x, element.center.y) await page.mouse.click(element.center.x, element.center.y) await asyncio.sleep(0.1) if platform.system() == "Darwin": await page.keyboard.press('Meta+A') else: await page.keyboard.press('Control+A') await asyncio.sleep(0.1) await page.keyboard.press('Backspace') return ActionResult(content='Removed all text in the element with index') @controller.action() async def get_select_options(index: int, browser: Browser) -> ActionResult: """Get all options from a element by the text (name) of the option. Use this after get_select_options and when you need to select an option from a dropdown.', ) async def select_dropdown_option( index: int, option: str, browser: Browser, ) -> ActionResult: """Select dropdown option by the text of the option you want to select""" try: # Get the interactive element page = await browser.get_current_page() interactive_elements = browser.get_state().interactive_elements # Verify the element exists and is a select if index not in interactive_elements: return ActionResult(error=f"No element found with index {index}") element = interactive_elements[index] # Check if it's a select element if element.tag_name.lower() != 'select': return ActionResult(error=f"Element {index} is not a select element, it's a {element.tag_name}") logger.debug(f"Attempting to select '{option}' using browser_agent_id: {element.browser_agent_id}") # Use JavaScript to select the option using the unique ID result = await page.evaluate(""" (args) => { const uniqueId = args.uniqueId; const optionText = args.optionText; try { // Find the select element by unique ID - works across frames too function findElementByUniqueId(root, id) { // Check in main document first let element = document.querySelector(`[data-browser-agent-id="${id}"]`); if (element) return element; } const select = findElementByUniqueId(window, uniqueId); if (!select) { return { success: false, error: "Select element not found with ID: " + uniqueId }; } // Find the option with matching text let found = false; let selectedValue = null; let selectedIndex = -1; for (let i = 0; i < select.options.length; i++) { const opt = select.options[i]; if (opt.text === optionText) { // Select this option opt.selected = true; found = true; selectedValue = opt.value; selectedIndex = i; // Trigger change event const event = new Event('change', { bubbles: true }); select.dispatchEvent(event); break; } } if (found) { return { success: true, value: selectedValue, index: selectedIndex }; } else { return { success: false, error: "Option not found: " + optionText, availableOptions: Array.from(select.options).map(o => o.text) }; } } catch (e) { return { success: false, error: e.toString() }; } } """, {"uniqueId": element.browser_agent_id, "optionText": option}) if result.get('success'): msg = f"Selected option '{option}' with value '{result.get('value')}' at index {result.get('index')}" logger.info(msg) return ActionResult(content=msg) else: error_msg = result.get('error', 'Unknown error') if 'availableOptions' in result: available = result.get('availableOptions', []) error_msg += f". Available options: {', '.join(available)}" logger.error(f"Selection failed: {error_msg}") return ActionResult(error=error_msg) except Exception as e: msg = f'Selection failed: {str(e)}' logger.error(msg) return ActionResult(error=msg) ``` ## /index/llm/llm.py ```py path="/index/llm/llm.py" from abc import ABC, abstractmethod from dataclasses import dataclass from enum import Enum from typing import Any, Dict, List, Optional, Union from pydantic import BaseModel class MessageRole(Enum): SYSTEM = "system" USER = "user" ASSISTANT = "assistant" TOOL = "tool" # For OpenAI function calling responses @dataclass class MessageContent: """Base class for message content""" cache_control: Optional[bool] = None @dataclass class TextContent(MessageContent): """Text content in a message""" text: str = "" type: str = "text" @dataclass class ImageContent(MessageContent): """Image content in a message""" image_b64: Optional[str] = None image_url: Optional[str] = None type: str = "image" @dataclass class ThinkingBlock(MessageContent): """Thinking block in a message""" thinking: str = "" signature: str = "" type: str = "thinking" @dataclass class Message: """A message in a conversation""" role: Union[str, MessageRole] content: Union[str, List[Union[TextContent, ImageContent, ThinkingBlock]]] name: Optional[str] = None # For tool/function messages tool_call_id: Optional[str] = None # For tool/function responses is_state_message: Optional[bool] = False def __post_init__(self): # Convert role enum to string if needed if isinstance(self.role, MessageRole): self.role = self.role.value # Convert string content to TextContent if needed if isinstance(self.content, str): self.content = [TextContent(text=self.content)] elif isinstance(self.content, (TextContent, ImageContent)): self.content = [self.content] def to_openai_format(self) -> Dict: """Convert to OpenAI message format""" message = {"role": self.role} if isinstance(self.content, str): message["content"] = self.content elif isinstance(self.content, list): content_blocks = [] for content_block in self.content: block = {} if isinstance(content_block, TextContent): block["type"] = "text" block["text"] = content_block.text elif isinstance(content_block, ImageContent): block["type"] = "image_url" block["image_url"] = { "url": "data:image/png;base64," + content_block.image_b64 } content_blocks.append(block) message["content"] = content_blocks return message def to_anthropic_format(self, enable_cache_control: bool = True) -> Dict: """Convert to Anthropic message format""" message = {"role": self.role} if isinstance(self.content, str): message["content"] = self.content elif isinstance(self.content, list): content_blocks = [] for content_block in self.content: block = {} if isinstance(content_block, TextContent): block["type"] = "text" block["text"] = content_block.text elif isinstance(content_block, ImageContent): block["type"] = "image" block["source"] = { "type": "base64", "media_type": "image/png", # This should be configurable based on image type "data": content_block.image_b64 if content_block.image_b64 else content_block.image_url } elif isinstance(content_block, ThinkingBlock): block["type"] = "thinking" block["thinking"] = content_block.thinking block["signature"] = content_block.signature if content_block.cache_control and enable_cache_control: block["cache_control"] = {"type": "ephemeral"} content_blocks.append(block) message["content"] = content_blocks return message def to_gemini_format(self) -> Dict: """Convert to Gemini message format""" parts = [] if isinstance(self.content, str): parts = [{"text": self.content}] elif isinstance(self.content, list): for content_block in self.content: if isinstance(content_block, TextContent): parts.append({"text": content_block.text}) elif isinstance(content_block, ImageContent): if content_block.image_b64: parts.append({"inline_data": { "mime_type": "image/png", "data": content_block.image_b64 }}) elif content_block.image_url: parts.append({"file_data": { "mime_type": "image/png", "file_uri": content_block.image_url }}) return { "role": 'model' if self.role == 'assistant' else 'user', "parts": parts } def remove_cache_control(self): if isinstance(self.content, list): for content_block in self.content: if isinstance(content_block, TextContent): content_block.cache_control = None elif isinstance(content_block, ImageContent): content_block.cache_control = None def add_cache_control_to_state_message(self): if not self.is_state_message or not isinstance(self.content, list) or len(self.content) < 3: return if len(self.content) == 3: self.content[-1].cache_control = True def has_cache_control(self): if not isinstance(self.content, list): return False return any(content.cache_control for content in self.content) class LLMResponse(BaseModel): content: str raw_response: Any usage: Dict[str, int] thinking: Optional[ThinkingBlock] = None class BaseLLMProvider(ABC): def __init__(self, model: str): self.model = model @abstractmethod async def call( self, messages: List[Message], temperature: float = 1, max_tokens: Optional[int] = None, **kwargs ) -> LLMResponse: pass ``` ## /index/llm/providers/__init__.py ```py path="/index/llm/providers/__init__.py" from .anthropic import AnthropicProvider from .anthropic_bedrock import AnthropicBedrockProvider from .gemini import GeminiProvider from .openai import OpenAIProvider __all__ = [ "OpenAIProvider", "AnthropicProvider", "AnthropicBedrockProvider", "GeminiProvider", ] ``` ## /index/llm/providers/anthropic.py ```py path="/index/llm/providers/anthropic.py" import logging from typing import List, Optional import backoff from anthropic import AsyncAnthropic from ..llm import BaseLLMProvider, LLMResponse, Message, ThinkingBlock from ..providers.anthropic_bedrock import AnthropicBedrockProvider logger = logging.getLogger(__name__) class AnthropicProvider(BaseLLMProvider): def __init__(self, model: str, enable_thinking: bool = True, thinking_token_budget: Optional[int] = 2048): super().__init__(model=model) self.client = AsyncAnthropic() self.thinking_token_budget = thinking_token_budget self.anthropic_bedrock = AnthropicBedrockProvider(model=f"us.anthropic.{model}-v1:0", enable_thinking=enable_thinking, thinking_token_budget=thinking_token_budget) self.enable_thinking = enable_thinking @backoff.on_exception( backoff.constant, # constant backoff Exception, # retry on any exception max_tries=3, # stop after 3 attempts interval=10, on_backoff=lambda details: logger.info( f"API error, retrying in {details['wait']:.2f} seconds... (attempt {details['tries']})" ) ) async def call( self, messages: List[Message], temperature: float = -1, max_tokens: Optional[int] = 16000, **kwargs ) -> LLMResponse: # Make a copy of messages to prevent modifying the original list during retries messages_copy = messages.copy() if len(messages_copy) < 2 or messages_copy[0].role != "system": raise ValueError("System message is required for Anthropic and length of messages must be at least 2") system_message = messages_copy[0] if self.enable_thinking: try: response = await self.client.messages.create( model=self.model, system=system_message.to_anthropic_format()["content"], messages=[msg.to_anthropic_format() for msg in messages_copy[1:]], temperature=1, thinking={ "type": "enabled", "budget_tokens": self.thinking_token_budget, }, max_tokens=max(self.thinking_token_budget + 1, max_tokens), **kwargs ) except Exception as e: logger.error(f"Error calling Anthropic: {str(e)}") response = await self.anthropic_bedrock.call( messages_copy, **kwargs ) return LLMResponse( content=response.content[1].text, raw_response=response, usage=response.usage.model_dump(), thinking=ThinkingBlock(thinking=response.content[0].thinking, signature=response.content[0].signature) ) else: response = await self.client.messages.create( model=self.model, messages=[msg.to_anthropic_format() for msg in messages_copy[1:]], temperature=temperature, max_tokens=max_tokens, system=system_message.to_anthropic_format()["content"], **kwargs ) return LLMResponse( content=response.content[0].text, raw_response=response, usage=response.usage.model_dump() ) ``` ## /index/llm/providers/anthropic_bedrock.py ```py path="/index/llm/providers/anthropic_bedrock.py" import logging import os from typing import List, Optional import backoff from anthropic import AsyncAnthropicBedrock from dotenv import load_dotenv from ..llm import BaseLLMProvider, LLMResponse, Message load_dotenv() logger = logging.getLogger(__name__) class AnthropicBedrockProvider(BaseLLMProvider): def __init__(self, model: str, enable_thinking: bool = True, thinking_token_budget: Optional[int] = 8192): super().__init__(model=model) self.client = AsyncAnthropicBedrock( aws_access_key=os.getenv('AWS_ACCESS_KEY_ID'), aws_secret_key=os.getenv('AWS_SECRET_ACCESS_KEY'), aws_region=os.getenv('AWS_REGION'), ) self.enable_thinking = enable_thinking self.thinking_token_budget = thinking_token_budget @backoff.on_exception( # noqa: F821 backoff.constant, # constant backoff Exception, # retry on any exception max_tries=3, # stop after 3 attempts interval=10, ) async def call( self, messages: List[Message], temperature: float = 1, max_tokens: Optional[int] = 2048, **kwargs ) -> LLMResponse: messages_copy = messages.copy() if len(messages_copy) < 2 or messages_copy[0].role != "system": raise ValueError("System message is required for Anthropic Bedrock and length of messages must be at least 2") system_message = messages_copy[0] try: if self.enable_thinking: response = await self.client.messages.create( model=self.model, system=system_message.to_anthropic_format(enable_cache_control=False)["content"], messages=[msg.to_anthropic_format(enable_cache_control=False) for msg in messages_copy[1:]], temperature=1, thinking={ "type": "enabled", "budget_tokens": self.thinking_token_budget, }, max_tokens=max(self.thinking_token_budget + 1, max_tokens), **kwargs ) return LLMResponse( content=response.content[1].text, raw_response=response, usage=response.usage ) else: response = await self.client.messages.create( model=self.model, messages=[msg.to_anthropic_format(enable_cache_control=False) for msg in messages_copy[1:]], temperature=temperature, max_tokens=max_tokens, system=system_message.to_anthropic_format(enable_cache_control=False)["content"], **kwargs ) return LLMResponse( content=response.content[0].text, raw_response=response, usage=response.usage ) except Exception as e: logger.error(f"Error calling Anthropic Bedrock: {str(e)}") raise e ``` ## /index/llm/providers/gemini.py ```py path="/index/llm/providers/gemini.py" import logging import os from typing import List, Optional import backoff from google import genai from ..llm import BaseLLMProvider, LLMResponse, Message logger = logging.getLogger(__name__) class GeminiProvider(BaseLLMProvider): def __init__(self, model: str, thinking_token_budget: int = 8192): super().__init__(model=model) self.client = genai.Client(api_key=os.getenv("GEMINI_API_KEY")) self.thinking_token_budget = thinking_token_budget @backoff.on_exception( backoff.constant, # constant backoff Exception, # retry on any exception max_tries=3, # stop after 3 attempts interval=0.5, on_backoff=lambda details: logger.info( f"API error, retrying in {details['wait']:.2f} seconds... (attempt {details['tries']})" ), ) async def call( self, messages: List[Message], temperature: float = 1.0, max_tokens: Optional[int] = None, **kwargs ) -> LLMResponse: if len(messages) < 2 or messages[0].role != "system": raise ValueError("System message is required and length of messages must be at least 2") system = messages[0].content[0].text gemini_messages = [msg.to_gemini_format() for msg in messages[1:]] config = { "temperature": temperature, "thinking_config": { "thinking_budget": self.thinking_token_budget }, "system_instruction": { "text": system } } if max_tokens: config["max_output_tokens"] = max_tokens response = await self.client.aio.models.generate_content( model=self.model, contents=gemini_messages, config=config, ) # Extract usage information if available usage = {} if hasattr(response, "usage_metadata"): usage = { "prompt_tokens": getattr(response.usage_metadata, "prompt_token_count", 0), "completion_tokens": getattr(response.usage_metadata, "candidates_token_count", 0), "total_tokens": getattr(response.usage_metadata, "total_token_count", 0) } return LLMResponse( content=response.text, raw_response=response, usage=usage ) ``` ## /index/llm/providers/openai.py ```py path="/index/llm/providers/openai.py" from typing import List, Optional from openai import AsyncOpenAI from ..llm import BaseLLMProvider, LLMResponse, Message class OpenAIProvider(BaseLLMProvider): def __init__(self, model: str, reasoning_effort: Optional[str] = "low"): super().__init__(model=model) self.client = AsyncOpenAI() self.reasoning_effort = reasoning_effort async def call( self, messages: List[Message], temperature: float = 1.0, ) -> LLMResponse: args = { "temperature": temperature, } if self.model.startswith("o") and self.reasoning_effort: args["reasoning_effort"] = self.reasoning_effort args["temperature"] = 1 response = await self.client.chat.completions.create( model=self.model, messages=[msg.to_openai_format() for msg in messages], **args ) return LLMResponse( content=response.choices[0].message.content, raw_response=response, usage={ "prompt_tokens": response.usage.prompt_tokens, "completion_tokens": response.usage.completion_tokens, "total_tokens": response.usage.total_tokens } ) ``` ## /pyproject.toml ```toml path="/pyproject.toml" [build-system] requires = ["hatchling"] build-backend = "hatchling.build" [tool.hatch.metadata] allow-direct-references = true [tool.hatch.build.targets.wheel] packages = ["index"] [project] name = "lmnr-index" version = "0.1.9" description = "Index - SOTA browser AI agent for autonomous task execution on the web" readme = "README.md" requires-python = ">=3.10" dependencies = [ "anthropic[bedrock]>=0.49.0", "backoff>=2.2.1", "lmnr[anthropic,openai]>=0.5.1", "openai>=1.65.2", "playwright>=1.50.0", "tenacity>=9.0.0", "pillow>=11.1.0", "rich>=13.5.0", "textual>=0.50.1", "typer>=0.9.0", "google-genai>=1.11.0", ] [project.scripts] index = "index.cli:main" [tool.uv] dev-dependencies = [ "pytest>=8.3.3" ] [project.license] file = "LICENSE" [tool.pytest.ini_options] asyncio_mode = "auto" testpaths = ["tests"] python_files = ["test_*.py"] addopts = "-v -ra -q" ``` ## /static/traces.png Binary file available at https://raw.githubusercontent.com/lmnr-ai/index/refs/heads/main/static/traces.png The content has been capped at 50000 tokens, and files over NaN bytes have been omitted. The user could consider applying other filters to refine the result. The better and more specific the context, the better the LLM can follow instructions. If the context seems verbose, the user can refine the filter using uithub. Thank you for using https://uithub.com - Perfect LLM context for any GitHub repo.