```
├── .github/
├── workflows/
├── claude-code-review.yml (400 tokens)
├── claude.yml (400 tokens)
├── .gitignore (700 tokens)
├── CLAUDE.md (1300 tokens)
├── LICENSE (omitted)
├── README.md (5.2k tokens)
├── docstrange/
├── WEB_INTERFACE.md (700 tokens)
├── __init__.py (100 tokens)
├── cli.py (3.8k tokens)
├── config.py (100 tokens)
├── exceptions.py (100 tokens)
├── extractor.py (3.1k tokens)
├── pipeline/
├── __init__.py
├── layout_detector.py (2.5k tokens)
├── model_downloader.py (2.5k tokens)
├── nanonets_processor.py (1200 tokens)
├── neural_document_processor.py (5.6k tokens)
├── ocr_service.py (1500 tokens)
├── processors/
├── __init__.py (200 tokens)
├── base.py (600 tokens)
├── cloud_processor.py (2.8k tokens)
├── docx_processor.py (1600 tokens)
├── excel_processor.py (1400 tokens)
├── gpu_processor.py (4.1k tokens)
├── html_processor.py (400 tokens)
├── image_processor.py (900 tokens)
├── pdf_processor.py (1900 tokens)
├── pptx_processor.py (1100 tokens)
├── txt_processor.py (700 tokens)
├── url_processor.py (2.8k tokens)
├── result.py (8.1k tokens)
├── services/
├── __init__.py
├── auth_service.py (6.2k tokens)
├── ollama_service.py (2.4k tokens)
├── static/
├── logo_clean.png
├── script.js (2.7k tokens)
├── styles.css (2.7k tokens)
├── templates/
├── index.html (8.6k tokens)
├── utils/
├── __init__.py (100 tokens)
├── gpu_utils.py (500 tokens)
├── web_app.py (1700 tokens)
├── example.py (200 tokens)
├── examples/
├── test.py (100 tokens)
├── mcp_server_module/
├── README.md (2.1k tokens)
├── __init__.py
├── __main__.py
├── claude_desktop_config.json (100 tokens)
├── server.py (6.2k tokens)
├── pyproject.toml (800 tokens)
├── scripts/
├── README.md (400 tokens)
├── __init__.py
├── prepare_s3_models.py (900 tokens)
├── setup_dev.py (400 tokens)
├── setup_environment.sh (300 tokens)
├── tests/
├── debug_ocr.py (600 tokens)
├── debug_ocr_provider.py (100 tokens)
├── test_advanced_ocr.py (1100 tokens)
├── test_cloud_mode.py (2.5k tokens)
├── test_converter.py (900 tokens)
├── test_enhanced_layout.py (1000 tokens)
├── test_enhanced_library.py (1600 tokens)
├── test_enhanced_pdf_processor.py (1300 tokens)
├── test_html_generation.py (500 tokens)
├── test_json_structure.py (1200 tokens)
├── test_ocr_with_real_image.py (2.3k tokens)
├── test_real_files.py (2.1k tokens)
├── test_real_files_enhanced.py (2.5k tokens)
├── test_real_json_conversion.py (700 tokens)
```
## /.github/workflows/claude-code-review.yml
```yml path="/.github/workflows/claude-code-review.yml"
name: Claude Code Review
on:
pull_request:
types: [opened, synchronize]
# Optional: Only run on specific file changes
# paths:
# - "src/**/*.ts"
# - "src/**/*.tsx"
# - "src/**/*.js"
# - "src/**/*.jsx"
jobs:
claude-review:
# Optional: Filter by PR author
# if: |
# github.event.pull_request.user.login == 'external-contributor' ||
# github.event.pull_request.user.login == 'new-developer' ||
# github.event.pull_request.author_association == 'FIRST_TIME_CONTRIBUTOR'
runs-on: ubuntu-latest
permissions:
contents: read
pull-requests: read
issues: read
id-token: write
steps:
- name: Checkout repository
uses: actions/checkout@v4
with:
fetch-depth: 1
- name: Run Claude Code Review
id: claude-review
uses: anthropics/claude-code-action@v1
with:
claude_code_oauth_token: ${{ secrets.CLAUDE_CODE_OAUTH_TOKEN }}
prompt: |
Please review this pull request and provide feedback on:
- Code quality and best practices
- Potential bugs or issues
- Performance considerations
- Security concerns
- Test coverage
Use the repository's CLAUDE.md for guidance on style and conventions. Be constructive and helpful in your feedback.
Use `gh pr comment` with your Bash tool to leave your review as a comment on the PR.
# See https://github.com/anthropics/claude-code-action/blob/main/docs/usage.md
# or https://docs.anthropic.com/en/docs/claude-code/sdk#command-line for available options
claude_args: '--allowed-tools "Bash(gh issue view:*),Bash(gh search:*),Bash(gh issue list:*),Bash(gh pr comment:*),Bash(gh pr diff:*),Bash(gh pr view:*),Bash(gh pr list:*)"'
```
## /.github/workflows/claude.yml
```yml path="/.github/workflows/claude.yml"
name: Claude Code
on:
issue_comment:
types: [created]
pull_request_review_comment:
types: [created]
issues:
types: [opened, assigned]
pull_request_review:
types: [submitted]
jobs:
claude:
if: |
(github.event_name == 'issue_comment' && contains(github.event.comment.body, '@claude')) ||
(github.event_name == 'pull_request_review_comment' && contains(github.event.comment.body, '@claude')) ||
(github.event_name == 'pull_request_review' && contains(github.event.review.body, '@claude')) ||
(github.event_name == 'issues' && (contains(github.event.issue.body, '@claude') || contains(github.event.issue.title, '@claude')))
runs-on: ubuntu-latest
permissions:
contents: read
pull-requests: read
issues: read
id-token: write
actions: read # Required for Claude to read CI results on PRs
steps:
- name: Checkout repository
uses: actions/checkout@v4
with:
fetch-depth: 1
- name: Run Claude Code
id: claude
uses: anthropics/claude-code-action@v1
with:
claude_code_oauth_token: ${{ secrets.CLAUDE_CODE_OAUTH_TOKEN }}
# This is an optional setting that allows Claude to read CI results on PRs
additional_permissions: |
actions: read
# Optional: Give a custom prompt to Claude. If this is not specified, Claude will perform the instructions specified in the comment that tagged it.
# prompt: 'Update the pull request description to include a summary of changes.'
# Optional: Add claude_args to customize behavior and configuration
# See https://github.com/anthropics/claude-code-action/blob/main/docs/usage.md
# or https://docs.anthropic.com/en/docs/claude-code/sdk#command-line for available options
# claude_args: '--model claude-opus-4-1-20250805 --allowed-tools Bash(gh pr:*)'
```
## /.gitignore
```gitignore path="/.gitignore"
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*.pyo
*.pyd
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be added to the global gitignore or merged into this project gitignore. For a PyCharm
# project, it is recommended to include the following files:
# .idea/
# *.iml
# *.ipr
# *.iws
# IDE
.vscode/
.idea/
# OS
.DS_Store
Thumbs.db
# Project specific
*.txt
*.pdf
*.docx
*.xlsx
sample_*
document_*
# Virtual environments
venv/
.env/
.venv/
ENV/
# PaddleOCR model cache
.paddlex/
# Logs
*.log
# Test outputs
*.out
*.tmp
# VSCode
.vscode/
.playwright-mcp/
examples/
venv1/
```
## /CLAUDE.md
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Project Overview
DocStrange is a Python library for extracting and converting documents (PDFs, Word, Excel, PowerPoint, images, URLs) into multiple formats (Markdown, JSON, CSV, HTML) with intelligent content extraction and advanced OCR capabilities.
The library offers two processing modes:
- **Cloud Mode (default)**: Instant conversion using cloud API
- **GPU Mode**: Local processing with GPU acceleration
## Commands
### Development Setup
```bash
# Install in development mode with all dependencies
pip install -e ".[dev]"
# Install with local LLM support (for enhanced JSON extraction)
pip install -e ".[local-llm]"
# Alternative setup script
python scripts/setup_dev.py
```
### Testing
```bash
# Run all tests
python -m pytest tests/ -v
# Run specific test file
python -m pytest tests/test_enhanced_pdf_processor.py -v
# Run with coverage
python -m pytest tests/ --cov=docstrange --cov-report=html
```
### Code Quality
```bash
# Format code with black
black docstrange/ tests/
# Sort imports
isort docstrange/ tests/
# Run linting
flake8 docstrange/ tests/
# Type checking
mypy docstrange/
```
### Building and Distribution
```bash
# Build package
python -m build
# Upload to PyPI (requires credentials)
python -m twine upload dist/*
```
## Architecture
### Core Components
**DocumentExtractor** (`docstrange/extractor.py`)
- Main entry point for document conversion
- Determines processing mode (cloud/cpu/gpu)
- Routes files to appropriate processors
- Handles authentication for cloud mode
**Processor Classes** (`docstrange/processors/`)
- `CloudProcessor`: Handles cloud-based processing via Nanonets API
- `GPUProcessor`: Local GPU-accelerated processing with neural models
- `PDFProcessor`, `DOCXProcessor`, etc.: Format-specific processors
- All processors inherit from `BaseProcessor`
**Pipeline Components** (`docstrange/pipeline/`)
- `NeuralDocumentProcessor`: Core neural processing for local modes
- `LayoutDetector`: Detects document structure and layout
- `OCRService`: Manages OCR engines (EasyOCR, PaddleOCR)
- `NanonetsProcessor`: Cloud API integration
**Services** (`docstrange/services/`)
- `AuthService`: Handles OAuth authentication for cloud mode
- `OllamaService`: Local LLM integration for enhanced JSON extraction
**Result Classes** (`docstrange/result.py`)
- `ConversionResult`: Base result class with extraction methods
- `GPUConversionResult`: Enhanced result for GPU processing
- `CloudConversionResult`: Result wrapper for cloud processing
### Processing Flow
1. **Document Input** → DocumentExtractor.extract()
2. **Mode Selection**: Cloud (default) | CPU | GPU
3. **Format Detection**: Identify file type and route to processor
4. **Processing**:
- Cloud: Upload to API → Process → Return results
- Local: Load document → OCR → Layout detection → Structure extraction
5. **Output Generation**: Markdown | JSON | CSV | HTML | Text
### Key Design Patterns
- **Factory Pattern**: DocumentExtractor creates appropriate processor instances
- **Strategy Pattern**: Different processors for different file formats
- **Chain of Responsibility**: OCR fallback mechanism (EasyOCR → PaddleOCR)
- **Caching**: Authentication tokens and model downloads are cached
## Processing Modes
### Cloud Mode (Default)
- No local setup required
- Rate limits: Limited daily calls (free) or 10k/month (authenticated)
- Authentication: `docstrange login` or API key
- Best for: Quick processing without GPU
### GPU Mode
- Force with `gpu=True` parameter
- Requires CUDA-compatible GPU
- Fastest local processing
- Best for: Batch processing, high-volume workloads
## Authentication & Rate Limits
### Free Tier
- Limited daily API calls
- No authentication required
### Authenticated Access (10k docs/month)
```bash
# Browser-based login (recommended)
docstrange login
# Check status
docstrange --login
# Logout
docstrange --logout
```
### API Key Access (10k docs/month)
- Get key from https://app.nanonets.com/#/keys
- Pass via `api_key` parameter or `NANONETS_API_KEY` env var
## MCP Server Integration
The repository includes an MCP server for Claude Desktop integration (local development only):
### Setup
1. Install: `pip install -e ".[dev]"`
2. Configure in `~/Library/Application Support/Claude/claude_desktop_config.json`:
```json
{
"mcpServers": {
"docstrange": {
"command": "python3",
"args": ["/path/to/docstrange/mcp_server_module/server.py"]
}
}
}
```
### Key Features
- Token-aware document processing
- Hierarchical navigation for large documents
- Smart chunking based on token limits
- Document search and section extraction
## Dependencies
### Core Dependencies
- `pdf2image`: PDF to image conversion
- `python-docx`, `python-pptx`, `openpyxl`: Office formats
- `beautifulsoup4`, `markdownify`: HTML/Markdown conversion
- `Pillow`, `pdf2image`: Image processing
### ML/OCR Dependencies
- `easyocr`: Primary OCR engine
- `paddleocr`: Fallback OCR (optional)
- `docling-ibm-models`: Layout detection
- `transformers`, `huggingface_hub`: Model management
### Optional Dependencies
- `ollama`: Local LLM for enhanced JSON extraction
- `mcp`, `tiktoken`: MCP server support (Python 3.10+)
## Environment Variables
- `NANONETS_API_KEY`: API key for cloud processing
- `OLLAMA_HOST`: Ollama server URL (default: http://localhost:11434)
- `HF_HOME`: Hugging Face cache directory for models
## Common Tasks
### Extract specific fields from documents
```python
result = extractor.extract("invoice.pdf")
fields = result.extract_data(specified_fields=["invoice_number", "total_amount"])
```
### Process with JSON schema
```python
schema = {"invoice_number": "string", "total_amount": "number"}
structured = result.extract_data(json_schema=schema)
```
### Force local processing
```python
# GPU mode (requires CUDA)
extractor = DocumentExtractor(gpu=True)
```
## Error Handling
The library uses custom exceptions:
- `ConversionError`: General conversion failures
- `UnsupportedFormatError`: Unknown file format
- `FileNotFoundError`: Missing input file
Cloud mode automatically retries on transient failures.
Local modes fall back through OCR engines if one fails.
## How to code
- If you are making any frontend changes, always try to use playwright to test changes to see if what you have implemented actually works in web
## /README.md

# <img src="https://public-vlms.s3.us-west-2.amazonaws.com/docstrange_logo.svg" alt="DocStrange" width="32" style="vertical-align: middle; margin-right: 8px;"> DocStrange
[](https://badge.fury.io/py/docstrange)
[](https://pypi.org/project/docstrange/)
[](https://pepy.tech/projects/docstrange)
[](https://github.com/NanoNets/docstrange)
[](https://github.com/NanoNets/docstrange)
[](https://opensource.org/licenses/MIT)
[](https://pypi.org/project/docstrange/)
[](https://github.com/NanoNets/docstrange/graphs/commit-activity)
> 🚀 **[Try DocStrange Online →](https://docstrange.nanonets.com/)**
# DocStrange
DocStrange converts documents to Markdown, JSON, CSV, and HTML quickly and accurately.
- Converts PDF, image, PPTX, DOCX, XLSX, and URL files.
- Formats tables into clean, LLM-optimized Markdown.
- Powered by an upgraded 7B model for higher accuracy and deeper document understanding.
- Extracts text from images and scanned documents with advanced OCR.
- Removes page artifacts for clean, readable output.
- Does structured extraction, given specific fields or a JSON schema.
- Includes a built-in, local Web UI for easy drag-and-drop conversion.
- Offers a free cloud API for instant processing or a 100% private, local mode.
- Works on GPU or CPU when running locally.
- Integrates with Claude Desktop via an MCP server for intelligent document navigation.
---

## Processing Modes
> **☁️ Free Cloud Processing upto 10000 docs per month !**
> Extract documents data instantly with the cloud processing - no complex setup needed
> **🔒 Local Processing !**
> Use `gpu` mode for 100% local processing - no data sent anywhere, everything stays on your machine.
## **What's New**
**August 2025**
- 🚀 **Major Model Upgrade**: The core model has been upgraded to **7B parameters**, delivering significantly higher accuracy and deeper understanding of complex documents.
- 🖥️ **Local Web Interface**: Introducing a built-in, local GUI. Now you can convert documents with a simple drag-and-drop interface, 100% offline.
---
## About
Convert and extract data from PDF, DOCX, images, and more into clean Markdown and structured JSON. Plus: Advanced table extraction, 100% local processing, and a built-in web UI.
`DocStrange` is a Python library for converting a wide range of document formats—including **PDF**, **DOCX**, **PPTX**, **XLSX**, and **images** — into clean, usable data. It produces LLM-optimized **Markdown**, structured **JSON** (with schema support), **HTML**, and **CSV** outputs, making it an ideal tool for preparing content for RAG pipelines and other AI applications.
The library offers both a powerful cloud API and a 100% private, offline mode that runs locally on your GPU. Developed by **Nanonets**, DocStrange is built on a powerful pipeline of OCR and layout detection models and currently requires **Python >=3.8**.
**To report a bug or request a feature, [please file an issue](https://github.com/NanoNets/docstrange/issues). To ask a question or request assistance, please use the [discussions forum](https://github.com/NanoNets/docstrange/discussions).**
---
## **How DocStrange Differs**
`DocStrange` focuses on end-to-end document understanding (OCR → layout → tables → clean Markdown or structured JSON) that you can run 100% locally. It is designed to deliver high-quality results from scans and photos without requiring the integration of multiple services.
- **vs. Cloud AI Services (like AWS Textract)**: `DocStrange` offers a completely private, local processing option and gives you full control over the conversion pipeline.
- **vs. Orchestration Frameworks (like LangChain)**: `DocStrange` is a ready-to-use parsing pipeline, not just a framework. It handles the complex OCR and layout analysis so you don't have to build it yourself.
- **vs. Other Document Parsers**: `DocStrange` is specifically built for robust OCR on scans and phone photos, not just digitally-native PDFs.
### **When to Pick DocStrange**
- You need a **free cloud api** to extract information in structured format (markdown, json, csv, html) from different document types
- You need **local processing** for privacy and compliance.
- You are working with **scans, phone photos, or receipts** where high-quality OCR is critical.
- You need a **fast path to clean Markdown or structured JSON** without training a model.
---
## **Examples**
Try the live demo: Test `DocStrange` instantly in your browser with no installation required at [docstrange.nanonets.com](https://docstrange.nanonets.com/)
**See it in action:**

<!--
**Example outputs: Here's a quick preview of the quality of output**
| Document Type | Source File | Output (Markdown) | Output (JSON) | Output (CSV) |
| --- | --- | --- | --- | --- |
| **Invoice PDF** | invoice.pdf | View Markdown | View JSON | View CSV |
| **Research Paper** | paper.pdf | View Markdown | View JSON | NA |
| **Word Document** | report.docx | View Markdown | View JSON | NA |
| **Scanned Invoice** | [Ziebart.JPG](https://nanonets.com/media/1587320232578_ziebart.jpeg) | View Markdown | View JSON | View CSV | -->
---
## **Installation**
Install the library using pip:
```bash
pip install docstrange
```
## **Quick Start**
> 💡 **New to DocStrange?** Try the [online demo](https://docstrange.nanonets.com/) first - no installation needed!
**1. Convert any Document to LLM-Ready Markdown**
This is the most common use case. Turn a complex PDF or DOCX file into clean, structured Markdown, perfect for RAG pipelines and other LLM applications.
```python
from docstrange import DocumentExtractor
# Initialize extractor (cloud mode by default)
extractor = DocumentExtractor()
# Convert any document to clean markdown
result = extractor.extract("document.pdf")
markdown = result.extract_markdown()
print(markdown)
```
**2. Extract Structured Data as JSON**
Go beyond plain text and extract all detected entities and content from your document into a structured JSON format.
```python
from docstrange import DocumentExtractor
# Extract document as structured JSON
extractor = DocumentExtractor()
result = extractor.extract("document.pdf")
# Get all important data as flat JSON
json_data = result.extract_data()
print(json_data)
```
**3. Extract Specific Fields from a PDF or Invoice**
Target only the key-value data you need, such as extracting the invoice_number or total_amount directly from a document.
```python
from docstrange import DocumentExtractor
# Extract only the fields you need
extractor = DocumentExtractor()
result = extractor.extract("invoice.pdf")
# Specify exactly which fields to extract
fields = result.extract_data(specified_fields=[
"invoice_number", "total_amount", "vendor_name", "due_date"
])
print(fields)
```
**4. Extract with Custom JSON Schema**
Ensure the structure of your output by providing a custom JSON schema. This is ideal for getting reliable, nested data structures for applications that process contracts or complex forms.
```python
from docstrange import DocumentExtractor
# Extract data conforming to your schema
extractor = DocumentExtractor()
result = extractor.extract("contract.pdf")
# Define your required structure
schema = {
"contract_number": "string",
"parties": ["string"],
"total_value": "number",
"start_date": "string",
"terms": ["string"]
}
structured_data = result.extract_data(json_schema=schema)
print(structured_data)
```
**Local Processing**
For complete privacy and offline capability, run DocStrange entirely on your own machine using GPU processing.
```python
# Force local GPU processing (requires CUDA)
extractor = DocumentExtractor(gpu=True)
```
---
## Local Web Interface
💡 Want a GUI? Run the simple, drag-and-drop local web interface for private, offline document conversion.
For users who prefer a graphical interface, DocStrange includes a powerful, self-hosted web UI. This allows for easy drag-and-drop conversion of PDF, DOCX, and other files directly in your browser, with 100% private, offline processing on your own GPU. The interface automatically downloads required models on its first run.
### How to get started?
1. **Install with web dependencies:**
```bash
pip install "docstrange[web]"
```
2. **Run the web interface:**
```bash
# Method 1: Using the CLI command
docstrange web
# Method 2: Using Python module
python -m docstrange.web_app
# Method 3: Direct Python import
python -c "from docstrange.web_app import run_web_app; run_web_app()"
```
3. **Open your browser:** Navigate to `http://localhost:8000` (or the port shown in the terminal)
### **Features of DocStrange's Local Web Interface:**
- 🖱️ Drag & Drop Interface: Simply drag files onto the upload area.
- 📁 Multiple File Types: Supports PDF, DOCX, XLSX, PPTX, images, and more.
- ⚙️ Processing Modes: Choose between Cloud and Local GPU processing.
- 📊 Multiple Output Formats: Get Markdown, HTML, JSON, CSV, and Flat JSON.
- 🔒 Privacy Options: Choose between cloud processing (default) or local GPU processing.
- 📱 Responsive Design: Works on desktop, tablet, and mobile
### **Supported File Types:**
- **Documents**: PDF, DOCX, DOC, PPTX, PPT
- **Spreadsheets**: XLSX, XLS, CSV
- **Images**: PNG, JPG, JPEG, TIFF, BMP
- **Web**: HTML, HTM
- **Text**: TXT
### **Processing Modes:**
- **Cloud processing:** For instant, zero-setup conversion, you can head over to [docstrange.nanonets.com](http://docstrange.nanonets.com/) **—** no setup (default)
- **Local GPU**: Fastest local processing, requires CUDA support
### **Output Formats:**
- **Markdown**: Clean, structured text perfect for documentation
- **HTML**: Formatted output with styling and layout
- **CSV**: Table data in spreadsheet format
- **Flat JSON**: Simplified JSON structure
- **Specific Fields**: Specific information from documents
### **Advanced Usage:**
1. Run on a Custom Port:
```bash
# Run on a different port
docstrange web --port 8080
python -c "from docstrange.web_app import run_web_app; run_web_app(port=8080)"
```
2. Run in Development Mode:
```bash
# Run with debug mode for development
python -c "from docstrange.web_app import run_web_app; run_web_app(debug=True)"
```
3. Run on a Custom Host (to make it accessible on your local network):
```bash
# Make accessible from other devices on the network
python -c "from docstrange.web_app import run_web_app; run_web_app(host='0.0.0.0')"
```
### **Troubleshooting**
1. Port Already in Use:
```bash
# Use a different port
docstrange web --port 8001
```
2. GPU Not Available:
- The interface automatically detects GPU availability
- GPU option will be disabled if CUDA is not available
- Error will be thrown
3. Model Download Issues:
- Models are downloaded automatically on first startup
- Check your internet connection during initial setup
- Download progress is shown in the terminal
4. Installation Issues:
```bash
# Install with all dependencies
pip install -e ".[web]"
# Or install Flask separately
pip install Flask
```
**Cloud Alternative**
Need cloud processing? Use the official DocStrange Cloud service: 🔗 **[docstrange.nanonets.com](https://docstrange.nanonets.com/)**
---
## Usage and Features
You can use DocStrange in three main ways: as a simple Web Interface, as a flexible Python Library, or as a powerful Command Line Interface (CLI). This section provides a summary of the library's key capabilities, followed by detailed guides and examples for each method.
1. **Convert Multiple File Types**
DocStrange natively handles a wide variety of formats, returning the most appropriate output for each.
```python
from docstrange import DocumentExtractor
extractor = DocumentExtractor()
# PDF document
pdf_result = extractor.extract("report.pdf")
print(pdf_result.extract_markdown())
# Word document
docx_result = extractor.extract("document.docx")
print(docx_result.extract_data())
# Excel spreadsheet
excel_result = extractor.extract("data.xlsx")
print(excel_result.extract_csv())
# PowerPoint presentation
pptx_result = extractor.extract("slides.pptx")
print(pptx_result.extract_html())
# Image with text
image_result = extractor.extract("screenshot.png")
print(image_result.extract_text())
# Web page
url_result = extractor.extract("https://example.com")
print(url_result.extract_markdown())
```
**b. Extract Tables to CSV**
Easily extracts all tables from a document into a clean CSV format.
```python
# Extract all tables from a document
result = extractor.extract("financial_report.pdf")
csv_data = result.extract_csv()
print(csv_data)
```
**c. Extract Specific Fields & Structured Data**
You can go beyond simple conversion and extract data in the exact structure you require. There are two ways to do this. You can either target and pull only the key-value data you need or ensure the structure of your output by providing a custom JSON schema.
```python
# Extract specific fields from any document
result = extractor.extract("invoice.pdf")
# Method 1: Extract specific fields
extracted = result.extract_data(specified_fields=[
"invoice_number",
"total_amount",
"vendor_name",
"due_date"
])
# Method 2: Extract using JSON schema
schema = {
"invoice_number": "string",
"total_amount": "number",
"vendor_name": "string",
"line_items": [{
"description": "string",
"amount": "number"
}]
}
structured = result.extract_data(json_schema=schema)
```
**d. Cloud Mode Usage Examples:**
Use DocStrange's cloud mode to extract precise, structured data from various documents by either specifying a list of fields to find or enforcing a custom JSON schema for the output. Authenticate with DocStrange login or a free API key to get 10,000 documents/month.
```python
from docstrange import DocumentExtractor
# Default cloud mode (rate-limited without API key)
extractor = DocumentExtractor()
# Authenticated mode (10k docs/month) - run 'docstrange login' first
extractor = DocumentExtractor() # Auto-uses cached credentials
# With API key for 10k docs/month (alternative to login)
extractor = DocumentExtractor(api_key="your_api_key_here")
# Extract specific fields from invoice
result = extractor.extract("invoice.pdf")
# Extract key invoice information
invoice_fields = result.extract_data(specified_fields=[
"invoice_number",
"total_amount",
"vendor_name",
"due_date",
"items_count"
])
print("Extracted Invoice Fields:")
print(invoice_fields)
# Output: {"extracted_fields": {"invoice_number": "INV-001", ...}, "format": "specified_fields"}
# Extract structured data using schema
invoice_schema = {
"invoice_number": "string",
"total_amount": "number",
"vendor_name": "string",
"billing_address": {
"street": "string",
"city": "string",
"zip_code": "string"
},
"line_items": [{
"description": "string",
"quantity": "number",
"unit_price": "number",
"total": "number"
}],
"taxes": {
"tax_rate": "number",
"tax_amount": "number"
}
}
structured_invoice = result.extract_data(json_schema=invoice_schema)
print("Structured Invoice Data:")
print(structured_invoice)
# Output: {"structured_data": {...}, "schema": {...}, "format": "structured_json"}
# Extract from different document types
receipt = extractor.extract("receipt.jpg")
receipt_data = receipt.extract_data(specified_fields=[
"merchant_name", "total_amount", "date", "payment_method"
])
contract = extractor.extract("contract.pdf")
contract_schema = {
"parties": [{
"name": "string",
"role": "string"
}],
"contract_value": "number",
"start_date": "string",
"end_date": "string",
"key_terms": ["string"]
}
contract_data = contract.extract_data(json_schema=contract_schema)
```
**e. Chain with LLM**
The clean Markdown output is perfect for use in Retrieval-Augmented Generation (RAG) and other LLM workflows.
```python
# Perfect for LLM workflows
document_text = extractor.extract("research_paper.pdf").extract_markdown()
# Use with any LLM
response = your_llm_client.chat(
messages=[{
"role": "user",
"content": f"Summarize this research paper:\n\n{document_text}"
}]
)
```
### **Key Capabilities**
- **🌐 Universal Input**: Process a wide range of formats, including **PDF**, **DOCX**, **PPTX**, **XLSX**, images, and URLs.
- **🔒 Dual Processing Modes**: Choose between a cloud API for instant processing or **100% private, local processing** on your own CPU or GPU.
- **🤖 Intelligent Extraction**: Extract **specific fields** or enforce a nested **JSON schema** to get structured data output.
- **🖼️ Advanced OCR**: Handle scanned documents and images with an OCR pipeline that includes **multiple engine fallbacks**.
- **📊 Table & Structure Recognition**: Accurately **extract tables** and preserve document structure, producing clean, **LLM-optimized** output.
- **🖥️ Built-in Web UI**: Use the built-in **drag-and-drop web interface** for easy local conversions.
### **How It Works**
DocStrange uses a multi-stage process to create structured output from documents.
1. **Ingestion**: It natively handles various file formats, including PDF, DOCX, PPTX, images, and URLs.
2. **Layout Detection**: The library identifies the structure of the document, such as headers, paragraphs, lists, and tables, to preserve the original reading order.
3. **OCR & Text Extraction**: It employs advanced OCR for scanned documents and directly extracts text from digital files.
4. **Formatting & Cleaning**: The extracted content is converted into clean, LLM-optimized Markdown and other formats, removing page artifacts.
5. **Structured Extraction (Optional)**: If a schema or specific fields are provided, DocStrange uses an LLM to populate the desired JSON structure.
---
## Cloud API Tiers and Rate Limits
`DocStrange` offers free cloud processing with different tiers to ensure fair usage.
- **🔐 Authenticated Access (Recommended)**
- **Rate Limit**: **10,000 documents/month**.
- **Setup**: A single command: `docstrange login`.
- **Benefits**: Links to your Google account for a significantly higher free limit.
- **🔑 API Key Access (Alternative)**
- **Rate Limit**: **10,000 documents/month**.
- **Setup**: Get a free API key from [docstrange.nanonets.com](https://docstrange.nanonets.com/).
- Usage: Pass the API key when initializing the library.
```python
# Free tier usage (limited calls daily)
extractor = DocumentExtractor()
# Authenticated access (10k docs/month) - run 'docstrange login' first
extractor = DocumentExtractor() # Auto-uses cached credentials
# API key access (10k docs/month)
extractor = DocumentExtractor(api_key="your_api_key_here")
```
💡 **Tip**: Start with the anonymous free tier to test functionality, then authenticate with `docstrange login` for the full 10,000 documents/month limit.
---
## **Command Line Interface (CLI)**
💡 **Prefer a GUI?** Try the [web interface](https://docstrange.nanonets.com/) for drag-and-drop document conversion!
For automation, scripting, and batch processing, you can use DocStrange directly from your terminal.
**Authentication Commands**
```bash
# One-time login for free 10k docs/month (alternative to api key)
docstrange login
# Alternatively
docstrange --login
# Re-authenticate if needed
docstrange login --reauth
# Logout and clear cached credentials
docstrange --logout
```
**Document Processing**
```bash
# Basic conversion (cloud mode default - limited calls free!)
docstrange document.pdf
# Authenticated processing (10k docs/month for free after login)
docstrange document.pdf
# With API key for 10k docs/month access (alternative to login)
docstrange document.pdf --api-key YOUR_API_KEY
# Local processing modes
docstrange document.pdf --gpu-mode
# Different output formats
docstrange document.pdf --output json
docstrange document.pdf --output html
docstrange document.pdf --output csv
# Extract specific fields
docstrange invoice.pdf --output json --extract-fields invoice_number total_amount
# Extract with JSON schema
docstrange document.pdf --output json --json-schema schema.json
# Multiple files
docstrange *.pdf --output markdown
# Save to file
docstrange document.pdf --output-file result.md
# Comprehensive field extraction examples
docstrange invoice.pdf --output json --extract-fields invoice_number vendor_name total_amount due_date line_items
# Extract from different document types with specific fields
docstrange receipt.jpg --output json --extract-fields merchant_name total_amount date payment_method
docstrange contract.pdf --output json --extract-fields parties contract_value start_date end_date
# Using JSON schema files for structured extraction
docstrange invoice.pdf --output json --json-schema invoice_schema.json
docstrange contract.pdf --output json --json-schema contract_schema.json
# Combine with authentication for 10k docs/month access (after 'docstrange login')
docstrange document.pdf --output json --extract-fields title author date summary
# Or use API key for 10k docs/month access (alternative to login)
docstrange document.pdf --api-key YOUR_API_KEY --output json --extract-fields title author date summary
```
**Example schema.json file:**
```json
{
"invoice_number": "string",
"total_amount": "number",
"vendor_name": "string",
"billing_address": {
"street": "string",
"city": "string",
"zip_code": "string"
},
"line_items": [{
"description": "string",
"quantity": "number",
"unit_price": "number"
}]
}
```
## **API Reference for library**
This section details the main classes and methods for programmatic use.
1. **DocumentExtractor**
```python
DocumentExtractor(
api_key: str = None, # API key for 10k docs/month (or use 'docstrange login' for same limits)
model: str = None, # Model for cloud processing ("gemini", "openapi", "nanonets")
cpu: bool = False, # Force local CPU processing
gpu: bool = False # Force local GPU processing
)
```
**b. ConversionResult Methods**
```python
result.extract_markdown() -> str # Clean markdown output
result.extract_data( # Structured JSON
specified_fields: List[str] = None, # Extract specific fields
json_schema: Dict = None # Extract with schema
) -> Dict
result.extract_html() -> str # Formatted HTML
result.extract_csv() -> str # CSV format for tables
result.extract_text() -> str # Plain text
```
---
## **🤖 MCP Server for Claude Desktop (Local Development)**
The DocStrange repository includes an optional MCP (Model Context Protocol) server for local development that enables intelligent document processing in Claude Desktop with token-aware navigation.
> Note: The MCP server is designed for local development and is **not included** in the PyPI package. Clone the repository to use it locally.
**Features**
- **Smart Token Counting**: Automatically counts tokens and recommends processing strategy
- **Hierarchical Navigation**: Navigate documents by structure when they exceed context limits
- **Intelligent Chunking**: Automatically splits large documents into token-limited chunks
- **Advanced Search**: Search within documents and get contextual results
**Local Setup**
1. Clone the repository:
```bash
git clone https://github.com/nanonets/docstrange.git
cd docstrange
```
2. Install in development mode:
```bash
pip install -e ".[dev]"
```
3. Add to your Claude Desktop config (`~/Library/Application Support/Claude/claude_desktop_config.json`):
```json
{
"mcpServers": {
"docstrange": {
"command": "python3",
"args": ["/path/to/docstrange/mcp_server_module/server.py"]
}
}
}
```
4. Restart Claude Desktop
For detailed setup and usage, see [mcp_server_module/README.md](https://github.com/NanoNets/docstrange/blob/main/mcp_server_module/README.md)
---
## **The Nanonets Ecosystem**
`DocStrange` is a powerful open-source library developed and maintained by the team at **Nanonets**. The full Nanonets platform is an AI-driven solution for automating end-to-end document processing for businesses. The platform allows technical and non-technical teams to build complete automated document workflows.
## **Community, Support, & License**
This is an actively developed open-source project, and we welcome your feedback and contributions.
- **Discussions**: For questions, ideas, and to show what you've built, please visit our [**GitHub Discussions**](https://www.google.com/search?q=URL_TO_GITHUB_DIScussions).
- **Issues**: For bug reports and feature requests, please open an [**Issue**](https://www.google.com/search?q=URL_TO_GITHUB_ISSUES).
- **Email**: For private inquiries, you can reach us at [**support@nanonets.com**](mailto:support@nanonets.com).
⭐ Star this repo if you find it helpful! Your support helps us improve the library.
**License:** This project is licensed under the **MIT License.**
## /docstrange/WEB_INTERFACE.md
# DocStrange Web Interface
A beautiful, modern web interface for the DocStrange document extraction library, inspired by the data-extraction-apis project design.
## Features
- **Modern UI**: Clean, responsive design with drag-and-drop file upload
- **Multiple Formats**: Support for PDF, Word, Excel, PowerPoint, images, and more
- **Output Options**: Convert to Markdown, HTML, JSON, CSV, or Flat JSON
- **Real-time Processing**: Live extraction with progress indicators
- **Download Results**: Save extracted content in various formats
- **Mobile Friendly**: Responsive design that works on all devices
## Quick Start
### 1. Install Dependencies
```bash
pip install docstrange[web]
```
### 2. Start the Web Interface
```bash
docstrange web
```
### 3. Open Your Browser
Navigate to: http://localhost:8000
## Usage
### File Upload
1. **Drag & Drop**: Simply drag your file onto the upload area
2. **Click to Browse**: Click the upload area to select a file from your computer
3. **Supported Formats**: PDF, Word (.docx, .doc), Excel (.xlsx, .xls), PowerPoint (.pptx, .ppt), HTML, CSV, Text, Images (PNG, JPG, TIFF, BMP)
### Output Format Selection
Choose from multiple output formats:
- **Markdown**: Clean, structured markdown text
- **HTML**: Formatted HTML with styling
- **JSON**: Structured JSON data
- **CSV**: Table data in CSV format
- **Flat JSON**: Simplified JSON structure
### Results View
After processing, you can:
- **Preview**: View formatted content in the preview tab
- **Raw Output**: See the raw extracted text
- **Download**: Save results as text or JSON files
## API Endpoints
The web interface also provides REST API endpoints:
### Health Check
```
GET /api/health
```
### Get Supported Formats
```
GET /api/supported-formats
```
### Extract Document
```
POST /api/extract
Content-Type: multipart/form-data
Parameters:
- file: The document file to extract
- output_format: markdown, html, json, csv, flat-json
```
## Configuration
### Environment Variables
- `FLASK_ENV`: Set to `development` for debug mode
- `MAX_CONTENT_LENGTH`: Maximum file size (default: 100MB)
### Customization
The web interface uses a modular design system:
- **CSS Variables**: Easy theming via CSS custom properties
- **Responsive Design**: Mobile-first approach
- **Component-based**: Reusable UI components
## Development
### Running in Development Mode
```bash
# Install development dependencies
pip install -e .
# Start with debug mode
python -m docstrange.web_app
```
### File Structure
```
docstrange/
├── web_app.py # Flask application
├── templates/
│ └── index.html # Main HTML template
└── static/
├── styles.css # Design system CSS
└── script.js # Frontend JavaScript
```
### Testing
```bash
# Run the test script
python test_web_interface.py
```
## Troubleshooting
### Common Issues
1. **Port Already in Use**
```bash
# Use a different port
docstrange web --port 8080
```
2. **File Upload Fails**
- Check file size (max 100MB)
- Verify file format is supported
- Ensure proper file permissions
3. **Extraction Errors**
- Check console logs for detailed error messages
- Verify document is not corrupted
- Try different output formats
### Logs
The web interface logs to the console. Check for:
- File upload events
- Processing status
- Error messages
- API request details
## Contributing
To contribute to the web interface:
1. Fork the repository
2. Create a feature branch
3. Make your changes
4. Test thoroughly
5. Submit a pull request
## License
This web interface is part of the DocStrange project and is licensed under the MIT License.
## /docstrange/__init__.py
```py path="/docstrange/__init__.py"
"""
Document Data Extractor - Extract structured data from any document into LLM-ready formats.
"""
from .extractor import DocumentExtractor
from .result import ConversionResult
from .processors import GPUConversionResult, CloudConversionResult
from .exceptions import ConversionError, UnsupportedFormatError
from .config import InternalConfig
__version__ = "1.1.5"
__all__ = [
"DocumentExtractor",
"ConversionResult",
"GPUConversionResult",
"CloudConversionResult",
"ConversionError",
"UnsupportedFormatError",
"InternalConfig"
]
```
## /docstrange/cli.py
```py path="/docstrange/cli.py"
"""Command-line interface for docstrange."""
import argparse
import sys
import os
import json
from pathlib import Path
from typing import List
from .extractor import DocumentExtractor
from .exceptions import ConversionError, UnsupportedFormatError, FileNotFoundError
from . import __version__
def print_version():
"""Print version information."""
print(f"docstrange v{__version__}")
print("Convert any document, text, or URL into LLM-ready data format")
print("with advanced intelligent document processing capabilities.")
def print_supported_formats(extractor: DocumentExtractor):
"""Print supported formats in a nice format."""
print("Supported input formats:")
print()
formats = extractor.get_supported_formats()
# Group formats by category
categories = {
"Documents": [f for f in formats if f in ['.pdf', '.docx', '.doc', '.txt', '.text']],
"Data Files": [f for f in formats if f in ['.xlsx', '.xls', '.csv']],
"Presentations": [f for f in formats if f in ['.ppt', '.pptx']],
"Web": [f for f in formats if f == 'URLs'],
"Images": [f for f in formats if f in ['.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.webp', '.gif']],
"Web Files": [f for f in formats if f in ['.html', '.htm']]
}
for category, format_list in categories.items():
if format_list:
print(f" {category}:")
for fmt in format_list:
print(f" - {fmt}")
print()
def process_single_input(extractor: DocumentExtractor, input_item: str, output_format: str, verbose: bool = False) -> dict:
"""Process a single input item and return result with metadata."""
if verbose:
print(f"Processing: {input_item}", file=sys.stderr)
try:
# Check if it's a URL
if input_item.startswith(('http://', 'https://')):
if extractor.cloud_mode:
raise ConversionError("URL processing is not supported in cloud mode. Use local mode for URLs.")
result = extractor.extract_url(input_item)
input_type = "URL"
# Check if it's a file
elif os.path.exists(input_item):
result = extractor.extract(input_item)
input_type = "File"
# Treat as text
else:
if extractor.cloud_mode:
raise ConversionError("Text processing is not supported in cloud mode. Use local mode for text.")
result = extractor.extract_text(input_item)
input_type = "Text"
return {
"success": True,
"result": result,
"input_type": input_type,
"input_item": input_item
}
except FileNotFoundError:
return {
"success": False,
"error": "File not found",
"input_item": input_item
}
except UnsupportedFormatError:
return {
"success": False,
"error": "Unsupported format",
"input_item": input_item
}
except ConversionError as e:
return {
"success": False,
"error": f"Conversion error: {e}",
"input_item": input_item
}
except Exception as e:
return {
"success": False,
"error": f"Unexpected error: {e}",
"input_item": input_item
}
def handle_login(force_reauth: bool = False) -> int:
"""Handle login command."""
try:
from .services.auth_service import get_authenticated_token
print("\n🔐 DocStrange Authentication")
print("=" * 50)
token = get_authenticated_token(force_reauth=force_reauth)
if token:
print("✅ Authentication successful!")
# Get cached credentials to show user info
try:
from .services.auth_service import AuthService
auth_service = AuthService()
cached_creds = auth_service.get_cached_credentials()
if cached_creds and cached_creds.get('auth0_direct'):
print(f"👤 Logged in as: {cached_creds.get('user_email', 'Unknown')}")
print(f"👤 Name: {cached_creds.get('user_name', 'Unknown')}")
print(f"🔐 Via: Auth0 Google Login")
print(f"🔑 Access Token: {token[:12]}...{token[-4:]}")
print("💾 Credentials cached securely")
else:
print(f"🔑 Access Token: {token[:12]}...{token[-4:]}")
print("💾 Credentials cached securely")
except:
print(f"🔑 Access Token: {token[:12]}...{token[-4:]}")
print("💾 Credentials cached securely")
print("\n💡 You can now use DocStrange cloud features without specifying --api-key")
print("🌐 Your CLI is authenticated with the same Google account used on docstrange.nanonets.com")
return 0
else:
print("❌ Authentication failed.")
return 1
except ImportError:
print("❌ Authentication service not available.", file=sys.stderr)
return 1
except Exception as e:
print(f"❌ Authentication error: {e}", file=sys.stderr)
return 1
def handle_logout() -> int:
"""Handle logout command."""
try:
from .services.auth_service import clear_auth
clear_auth()
print("✅ Logged out successfully.")
print("💾 Cached authentication credentials cleared.")
return 0
except ImportError:
print("❌ Authentication service not available.", file=sys.stderr)
return 1
except Exception as e:
print(f"❌ Error clearing credentials: {e}", file=sys.stderr)
return 1
def main():
"""Main CLI function."""
parser = argparse.ArgumentParser(
description="Convert documents to LLM-ready formats with intelligent document processing",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Authentication (browser-based login)
docstrange login # One-click browser login
docstrange login --reauth # Force re-authentication
# Start web interface
docstrange web # Start web interface at http://localhost:8000
# Convert a PDF to markdown (default cloud mode)
docstrange document.pdf
# Convert with free API key with increased limits
docstrange document.pdf --api-key YOUR_API_KEY
# Force local GPU processing
docstrange document.pdf --gpu-mode
# Convert to different output formats
docstrange document.pdf --output html
docstrange document.pdf --output json
docstrange document.pdf --output csv # Extract tables as CSV
# Use specific model for cloud processing
docstrange document.pdf --model gemini
docstrange document.pdf --model openapi --output json
docstrange document.pdf --model nanonets --output csv
# Convert a URL (works in all modes)
docstrange https://example.com --output html
# Convert plain text (works in all modes)
docstrange "Hello world" --output json
# Convert multiple files
docstrange file1.pdf file2.docx file3.xlsx --output markdown
# Extract specific fields using cloud processing
docstrange invoice.pdf --output json --extract-fields invoice_number total_amount vendor_name
# Extract using JSON schema with cloud processing
docstrange document.pdf --output json --json-schema schema.json
# Save output to file
docstrange document.pdf --output-file output.md
# Use environment variable for API key
export NANONETS_API_KEY=your_api_key
docstrange document.pdf
# List supported formats
docstrange --list-formats
# Show version
docstrange --version
"""
)
parser.add_argument(
"input",
nargs="*",
help="Input file(s), URL(s), or text to extract"
)
parser.add_argument(
"--output", "-o",
choices=["markdown", "html", "json", "text", "csv"],
default="markdown",
help="Output format (default: markdown)"
)
# Processing mode arguments
parser.add_argument(
"--gpu-mode",
action="store_true",
help="Force local GPU processing (disables cloud mode, requires GPU)"
)
parser.add_argument(
"--api-key",
help="API key for increased cloud access (get it free from https://app.nanonets.com/#/keys)"
)
parser.add_argument(
"--model",
choices=["gemini", "openapi", "nanonets"],
help="Model to use for cloud processing (gemini, openapi, nanonets)"
)
parser.add_argument(
"--ollama-url",
default="http://localhost:11434",
help="Ollama server URL for local field extraction (default: http://localhost:11434)"
)
parser.add_argument(
"--ollama-model",
default="llama3.2",
help="Ollama model for local field extraction (default: llama3.2)"
)
parser.add_argument(
"--extract-fields",
nargs="+",
help="Extract specific fields using cloud processing (e.g., --extract-fields invoice_number total_amount)"
)
parser.add_argument(
"--json-schema",
help="JSON schema file for structured extraction using cloud processing"
)
parser.add_argument(
"--preserve-layout",
action="store_true",
default=True,
help="Preserve document layout (default: True)"
)
parser.add_argument(
"--include-images",
action="store_true",
help="Include images in output"
)
parser.add_argument(
"--ocr-enabled",
action="store_true",
help="Enable intelligent document processing for images and PDFs"
)
parser.add_argument(
"--output-file", "-f",
help="Output file path (if not specified, prints to stdout)"
)
parser.add_argument(
"--list-formats",
action="store_true",
help="List supported input formats and exit"
)
parser.add_argument(
"--version",
action="store_true",
help="Show version information and exit"
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Enable verbose output"
)
parser.add_argument(
"--login",
action="store_true",
help="Perform browser-based authentication login"
)
parser.add_argument(
"--reauth",
action="store_true",
help="Force re-authentication (use with --login)"
)
parser.add_argument(
"--logout",
action="store_true",
help="Clear cached authentication credentials"
)
args = parser.parse_args()
# Handle version flag
if args.version:
print_version()
return 0
# Handle list formats flag
if args.list_formats:
# Create a extractor to get supported formats
extractor = DocumentExtractor(
api_key=args.api_key,
model=args.model,
gpu=args.gpu_mode
)
print_supported_formats(extractor)
return 0
# Handle authentication commands
# Check if first argument is "login" command
if args.input and args.input[0] == "login":
force_reauth = "--reauth" in sys.argv
return handle_login(force_reauth)
# Handle web command
if args.input and args.input[0] == "web":
try:
from .web_app import run_web_app
print("Starting DocStrange web interface...")
print("Open your browser and go to: http://localhost:8000")
print("Press Ctrl+C to stop the server")
run_web_app(host='0.0.0.0', port=8000, debug=False)
return 0
except ImportError:
print("❌ Web interface not available. Install Flask: pip install Flask", file=sys.stderr)
return 1
# Handle login flags
if args.login or args.logout:
if args.logout:
return handle_logout()
else:
return handle_login(args.reauth)
# Check if input is provided
if not args.input:
parser.error("No input specified. Please provide file(s), URL(s), or text to extract.")
# Cloud mode is default. Without login/API key it's limited calls.
# Use 'docstrange login' (recommended) or --api-key for 10k docs/month for free.
# Initialize extractor
extractor = DocumentExtractor(
api_key=args.api_key,
model=args.model,
gpu=args.gpu_mode
)
if args.verbose:
mode = "local" if args.gpu_mode else "cloud"
print(f"Initialized extractor in {mode} mode:")
print(f" - Output format: {args.output}")
if mode == "cloud":
has_api_or_auth = bool(args.api_key or extractor.api_key)
print(f" - Auth: {'authenticated (10k/month) free calls' if has_api_or_auth else 'not authenticated (limited free calls)'}")
if args.model:
print(f" - Model: {args.model}")
else:
print(f" - Local processing: GPU")
print()
# Process inputs
results = []
errors = []
for i, input_item in enumerate(args.input, 1):
if args.verbose and len(args.input) > 1:
print(f"[{i}/{len(args.input)}] Processing: {input_item}", file=sys.stderr)
result = process_single_input(extractor, input_item, args.output, args.verbose)
if result["success"]:
results.append(result["result"])
if not args.verbose:
print(f"Processing ... : {input_item}", file=sys.stderr)
else:
errors.append(result)
print(f"❌ Failed: {input_item} - {result['error']}", file=sys.stderr)
# Check if we have any successful results
if not results:
print("❌ No files were successfully processed.", file=sys.stderr)
if errors:
print("Errors encountered:", file=sys.stderr)
for error in errors:
print(f" - {error['input_item']}: {error['error']}", file=sys.stderr)
return 1
# Generate output
if len(results) == 1:
# Single result
result = results[0]
if args.output == "markdown":
output_content = result.extract_markdown()
elif args.output == "html":
output_content = result.extract_html()
elif args.output == "json":
# Handle field extraction if specified
json_schema = None
if args.json_schema:
try:
with open(args.json_schema, 'r') as f:
json_schema = json.load(f)
except Exception as e:
print(f"Error loading JSON schema: {e}", file=sys.stderr)
sys.exit(1)
try:
result_json = result.extract_data(
specified_fields=args.extract_fields,
json_schema=json_schema,
)
output_content = json.dumps(result_json, indent=2)
except Exception as e:
print(f"Error during JSON extraction: {e}", file=sys.stderr)
sys.exit(1)
elif args.output == "csv":
try:
output_content = result.extract_csv(include_all_tables=True)
except ValueError as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
else: # text
output_content = result.extract_text()
else:
# Multiple results - combine them
if args.output == "markdown":
output_content = "\n\n---\n\n".join(r.extract_markdown() for r in results)
elif args.output == "html":
output_content = "\n\n<hr>\n\n".join(r.extract_html() for r in results)
elif args.output == "json":
# Handle field extraction for multiple results
json_schema = None
if args.json_schema:
try:
with open(args.json_schema, 'r') as f:
json_schema = json.load(f)
except Exception as e:
print(f"Error loading JSON schema: {e}", file=sys.stderr)
sys.exit(1)
try:
extracted_results = []
for r in results:
result_json = r.extract_data(
specified_fields=args.extract_fields,
json_schema=json_schema,
)
extracted_results.append(result_json)
combined_json = {
"results": extracted_results,
"count": len(results),
"errors": [{"input": e["input_item"], "error": e["error"]} for e in errors] if errors else []
}
output_content = json.dumps(combined_json, indent=2)
except Exception as e:
print(f"Error during JSON extraction: {e}", file=sys.stderr)
sys.exit(1)
elif args.output == "csv":
csv_outputs = []
for i, r in enumerate(results):
try:
csv_content = r.extract_csv(include_all_tables=True)
if csv_content.strip():
csv_outputs.append(f"=== File {i + 1} ===\n{csv_content}")
except ValueError:
# Skip files without tables
continue
if not csv_outputs:
print("Error: No tables found in any of the input files", file=sys.stderr)
sys.exit(1)
output_content = "\n\n".join(csv_outputs)
else: # text
output_content = "\n\n---\n\n".join(r.extract_text() for r in results)
# Write output
if args.output_file:
try:
with open(args.output_file, 'w', encoding='utf-8') as f:
f.write(output_content)
print(f"✅ Output written to: {args.output_file}", file=sys.stderr)
except Exception as e:
print(f"❌ Failed to write output file: {e}", file=sys.stderr)
return 1
else:
print(output_content)
# Summary
if args.verbose or len(args.input) > 1:
print(f"\nSummary: {len(results)} successful, {len(errors)} failed", file=sys.stderr)
return 0 if not errors else 1
if __name__ == "__main__":
sys.exit(main())
```
## /docstrange/config.py
```py path="/docstrange/config.py"
# docstrange/config.py
class InternalConfig:
# Internal feature flags and defaults (not exposed to end users)
use_markdownify = True
ocr_provider = 'neural' # OCR provider to use (neural for docling models)
# PDF processing configuration
pdf_to_image_enabled = True # Convert PDF pages to images for OCR
pdf_image_dpi = 300 # DPI for PDF to image conversion
pdf_image_scale = 2.0 # Scale factor for better OCR accuracy
# Add other internal config options here as needed
# e.g. default_ocr_lang = 'en'
# e.g. enable_layout_aware_ocr = True
```
## /docstrange/exceptions.py
```py path="/docstrange/exceptions.py"
"""Custom exceptions for the LLM Data Converter library."""
class ConversionError(Exception):
"""Raised when document conversion fails."""
pass
class UnsupportedFormatError(Exception):
"""Raised when the input format is not supported."""
pass
class FileNotFoundError(Exception):
"""Raised when the input file is not found."""
pass
class NetworkError(Exception):
"""Raised when network operations fail (e.g., URL fetching)."""
pass
```
## /docstrange/extractor.py
```py path="/docstrange/extractor.py"
"""Main extractor class for handling document conversion."""
import os
import logging
from typing import List, Optional
from .processors import (
PDFProcessor,
DOCXProcessor,
TXTProcessor,
ExcelProcessor,
URLProcessor,
HTMLProcessor,
PPTXProcessor,
ImageProcessor,
CloudProcessor,
GPUProcessor,
)
from .result import ConversionResult
from .exceptions import ConversionError, UnsupportedFormatError, FileNotFoundError
from .utils.gpu_utils import should_use_gpu_processor
# Configure logging
logger = logging.getLogger(__name__)
class DocumentExtractor:
"""Main class for converting documents to LLM-ready formats."""
def __init__(
self,
preserve_layout: bool = True,
include_images: bool = True,
ocr_enabled: bool = True,
api_key: Optional[str] = None,
model: Optional[str] = None,
gpu: bool = False
):
"""Initialize the file extractor.
Args:
preserve_layout: Whether to preserve document layout
include_images: Whether to include images in output
ocr_enabled: Whether to enable OCR for image and PDF processing
api_key: API key for cloud processing (optional). Prefer 'docstrange login' for 10k docs/month; API key from https://app.nanonets.com/#/keys is an alternative
model: Model to use for cloud processing (gemini, openapi) - only for cloud mode
gpu: Force local GPU processing (disables cloud mode, requires GPU)
Note:
- Cloud mode is the default unless gpu is specified
- Without login or API key, limited calls per day
- For 10k docs/month, run 'docstrange login' (recommended) or use an API key from https://app.nanonets.com/#/keys
"""
self.preserve_layout = preserve_layout
self.include_images = include_images
self.api_key = api_key
self.model = model
self.gpu = gpu
# Determine processing mode
# Cloud mode is default unless GPU preference is explicitly set
self.cloud_mode = not self.gpu
# Check GPU availability if GPU preference is set
if self.gpu and not should_use_gpu_processor():
raise RuntimeError(
"GPU preference specified but no GPU is available. "
"Please ensure CUDA is installed and a compatible GPU is present."
)
# Default to True if not explicitly set
if ocr_enabled is None:
self.ocr_enabled = True
else:
self.ocr_enabled = ocr_enabled
# Try to get API key from environment if not provided
if self.cloud_mode and not self.api_key:
self.api_key = os.environ.get('NANONETS_API_KEY')
# If still no API key, try to get from cached credentials
if not self.api_key:
try:
from .services.auth_service import get_authenticated_token
cached_token = get_authenticated_token(force_reauth=False)
if cached_token:
self.api_key = cached_token
logger.info("Using cached authentication credentials")
except ImportError:
logger.debug("Authentication service not available")
except Exception as e:
logger.warning(f"Could not retrieve cached credentials: {e}")
# Initialize processors
self.processors = []
if self.cloud_mode:
# Cloud mode setup
cloud_processor = CloudProcessor(
api_key=self.api_key, # Can be None for rate-limited access
model_type=self.model,
preserve_layout=preserve_layout,
include_images=include_images
)
self.processors.append(cloud_processor)
if self.api_key:
logger.info("Cloud processing enabled with authenticated access (10k docs/month)")
else:
logger.info("Cloud processing enabled without authentication (limited free calls). Run 'docstrange login' for 10k docs/month free calls or pass api_key.")
# logger.warning("For increased limits , provide an API key from https://app.nanonets.com/#/keys" for free)
else:
# Local mode setup
logger.info("Local processing mode enabled")
self._setup_local_processors()
def authenticate(self, force_reauth: bool = False) -> bool:
"""
Perform browser-based authentication and update API key.
Args:
force_reauth: Force re-authentication even if cached credentials exist
Returns:
True if authentication successful, False otherwise
"""
try:
from .services.auth_service import get_authenticated_token
token = get_authenticated_token(force_reauth=force_reauth)
if token:
self.api_key = token
# Update cloud processor if it exists
for processor in self.processors:
if hasattr(processor, 'api_key'):
processor.api_key = token
logger.info("Updated processor with new authentication token")
return True
else:
return False
except ImportError:
logger.error("Authentication service not available")
return False
except Exception as e:
logger.error(f"Authentication failed: {e}")
return False
def _setup_local_processors(self):
"""Setup local processors based on GPU preferences."""
local_processors = [
PDFProcessor(preserve_layout=self.preserve_layout, include_images=self.include_images, ocr_enabled=self.ocr_enabled),
DOCXProcessor(preserve_layout=self.preserve_layout, include_images=self.include_images),
TXTProcessor(preserve_layout=self.preserve_layout, include_images=self.include_images),
ExcelProcessor(preserve_layout=self.preserve_layout, include_images=self.include_images),
HTMLProcessor(preserve_layout=self.preserve_layout, include_images=self.include_images),
PPTXProcessor(preserve_layout=self.preserve_layout, include_images=self.include_images),
ImageProcessor(preserve_layout=self.preserve_layout, include_images=self.include_images, ocr_enabled=self.ocr_enabled),
URLProcessor(preserve_layout=self.preserve_layout, include_images=self.include_images),
]
# Add GPU processor if GPU preference is specified
if self.gpu:
logger.info("GPU preference specified - adding GPU processor with Nanonets OCR")
gpu_processor = GPUProcessor(preserve_layout=self.preserve_layout, include_images=self.include_images, ocr_enabled=self.ocr_enabled)
local_processors.append(gpu_processor)
self.processors.extend(local_processors)
def extract(self, file_path: str) -> ConversionResult:
"""Convert a file to internal format.
Args:
file_path: Path to the file to extract
Returns:
ConversionResult containing the processed content
Raises:
FileNotFoundError: If the file doesn't exist
UnsupportedFormatError: If the format is not supported
ConversionError: If conversion fails
"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
# Find the appropriate processor
processor = self._get_processor(file_path)
if not processor:
raise UnsupportedFormatError(f"No processor found for file: {file_path}")
logger.info(f"Using processor {processor.__class__.__name__} for {file_path}")
# Process the file
return processor.process(file_path)
def convert_with_output_type(self, file_path: str, output_type: str) -> ConversionResult:
"""Convert a file with specific output type for cloud processing.
Args:
file_path: Path to the file to extract
output_type: Desired output type (markdown, flat-json, html)
Returns:
ConversionResult containing the processed content
Raises:
FileNotFoundError: If the file doesn't exist
UnsupportedFormatError: If the format is not supported
ConversionError: If conversion fails
"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
# For cloud mode, create a processor with the specific output type
if self.cloud_mode and self.api_key:
cloud_processor = CloudProcessor(
api_key=self.api_key,
output_type=output_type,
model_type=self.model, # Pass model as model_type
preserve_layout=self.preserve_layout,
include_images=self.include_images
)
if cloud_processor.can_process(file_path):
logger.info(f"Using cloud processor with output_type={output_type} for {file_path}")
return cloud_processor.process(file_path)
# Fallback to regular conversion for local mode
return self.extract(file_path)
def extract_url(self, url: str) -> ConversionResult:
"""Convert a URL to internal format.
Args:
url: URL to extract
Returns:
ConversionResult containing the processed content
Raises:
ConversionError: If conversion fails
"""
# Cloud mode doesn't support URL conversion
if self.cloud_mode:
raise ConversionError("URL conversion is not supported in cloud mode. Use local mode for URL processing.")
# Find the URL processor
url_processor = None
for processor in self.processors:
if isinstance(processor, URLProcessor):
url_processor = processor
break
if not url_processor:
raise ConversionError("URL processor not available")
logger.info(f"Converting URL: {url}")
return url_processor.process(url)
def extract_text(self, text: str) -> ConversionResult:
"""Convert plain text to internal format.
Args:
text: Plain text to extract
Returns:
ConversionResult containing the processed content
"""
# Cloud mode doesn't support text conversion
if self.cloud_mode:
raise ConversionError("Text conversion is not supported in cloud mode. Use local mode for text processing.")
metadata = {
"content_type": "text",
"processor": "TextConverter",
"preserve_layout": self.preserve_layout
}
return ConversionResult(text, metadata)
def is_cloud_enabled(self) -> bool:
"""Check if cloud processing is enabled and configured.
Returns:
True if cloud processing is available
"""
return self.cloud_mode and bool(self.api_key)
def get_processing_mode(self) -> str:
"""Get the current processing mode.
Returns:
String describing the current processing mode
"""
if self.cloud_mode and self.api_key:
return "cloud"
elif self.gpu:
return "gpu_forced"
elif should_use_gpu_processor():
return "gpu_auto"
else:
return "cloud"
def _get_processor(self, file_path: str):
"""Get the appropriate processor for the file.
Args:
file_path: Path to the file
Returns:
Processor that can handle the file, or None if none found
"""
# Define GPU-supported formats
gpu_supported_formats = ['.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.webp', '.gif', '.pdf']
# Check file extension
_, ext = os.path.splitext(file_path.lower())
# Check if GPU processor should be used for this file type
gpu_available = should_use_gpu_processor()
# Try GPU processor only if format is supported AND (gpu OR auto-gpu)
if ext in gpu_supported_formats and (self.gpu or (gpu_available and not self.gpu)):
for processor in self.processors:
if isinstance(processor, GPUProcessor):
if self.gpu:
logger.info(f"Using GPU processor with Nanonets OCR for {file_path} (GPU preference specified)")
else:
logger.info(f"Using GPU processor with Nanonets OCR for {file_path} (GPU available and format supported)")
return processor
# Fallback to normal processor selection
for processor in self.processors:
if processor.can_process(file_path):
# Skip GPU processor in fallback mode to avoid infinite loops
if isinstance(processor, GPUProcessor):
continue
logger.info(f"Using {processor.__class__.__name__} for {file_path}")
return processor
return None
def get_supported_formats(self) -> List[str]:
"""Get list of supported file formats.
Returns:
List of supported file extensions
"""
formats = []
for processor in self.processors:
if hasattr(processor, 'can_process'):
# This is a simplified way to get formats
# In a real implementation, you might want to store this info
if isinstance(processor, PDFProcessor):
formats.extend(['.pdf'])
elif isinstance(processor, DOCXProcessor):
formats.extend(['.docx', '.doc'])
elif isinstance(processor, TXTProcessor):
formats.extend(['.txt', '.text'])
elif isinstance(processor, ExcelProcessor):
formats.extend(['.xlsx', '.xls', '.csv'])
elif isinstance(processor, HTMLProcessor):
formats.extend(['.html', '.htm'])
elif isinstance(processor, PPTXProcessor):
formats.extend(['.ppt', '.pptx'])
elif isinstance(processor, ImageProcessor):
formats.extend(['.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.webp', '.gif'])
elif isinstance(processor, URLProcessor):
formats.append('URLs')
elif isinstance(processor, CloudProcessor):
# Cloud processor supports many formats, but we don't want duplicates
pass
elif isinstance(processor, GPUProcessor):
# GPU processor supports all image formats and PDFs
formats.extend(['.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.webp', '.gif', '.pdf'])
return list(set(formats)) # Remove duplicates
```
## /docstrange/pipeline/__init__.py
```py path="/docstrange/pipeline/__init__.py"
"""Pipeline package for document processing and OCR."""
```
## /docstrange/pipeline/layout_detector.py
```py path="/docstrange/pipeline/layout_detector.py"
"""Layout detection and markdown generation for document processing."""
import re
import logging
from typing import List, Dict, Tuple
import numpy as np
logger = logging.getLogger(__name__)
class LayoutElement:
"""Represents a layout element with position and content."""
def __init__(self, text: str, x: int, y: int, width: int, height: int,
element_type: str = "text", confidence: float = 0.0):
self.text = text
self.x = x
self.y = y
self.width = width
self.height = height
self.element_type = element_type
self.confidence = confidence
self.bbox = (x, y, x + width, y + height)
def area(self) -> int:
"""Calculate area of the element."""
return self.width * self.height
def center_y(self) -> float:
"""Get center Y coordinate."""
return self.y + self.height / 2
def center_x(self) -> float:
"""Get center X coordinate."""
return self.x + self.width / 2
class LayoutDetector:
"""Handles layout detection and markdown generation."""
def __init__(self):
"""Initialize the layout detector."""
# Layout detection parameters
self._header_threshold = 0.15 # Top 15% of page considered header area
self._footer_threshold = 0.85 # Bottom 15% of page considered footer area
self._heading_height_threshold = 1.5 # Relative height for heading detection
self._list_patterns = [
r'^\d+\.', # Numbered list
r'^[•·▪▫◦‣⁃]', # Bullet points
r'^[-*+]', # Markdown list markers
r'^[a-zA-Z]\.', # Lettered list
]
def convert_to_structured_markdown(self, text_blocks: List[LayoutElement], image_size: Tuple[int, int]) -> str:
"""Convert text blocks to structured markdown with proper hierarchy."""
if not text_blocks:
return ""
# Sort blocks by vertical position (top to bottom), then horizontal (left to right)
text_blocks.sort(key=lambda x: (x.y, x.x))
# Group blocks into paragraphs based on vertical spacing and text analysis
paragraphs = self._group_into_paragraphs_advanced(text_blocks, image_size)
# Convert paragraphs to markdown
markdown_parts = []
for paragraph in paragraphs:
if paragraph:
# Determine if this paragraph is a heading, list, or regular text
paragraph_type = self._classify_paragraph(paragraph)
if paragraph_type == "heading":
level = self._determine_heading_level_from_text(paragraph)
markdown_parts.append(f"{'#' * level} {paragraph}")
elif paragraph_type == "list_item":
markdown_parts.append(f"- {paragraph}")
elif paragraph_type == "table_row":
markdown_parts.append(self._format_table_row(paragraph))
else:
markdown_parts.append(paragraph)
return '\n\n'.join(markdown_parts)
def _group_into_paragraphs_advanced(self, text_blocks: List[LayoutElement], image_size: Tuple[int, int]) -> List[str]:
"""Advanced paragraph grouping using multiple heuristics."""
if not text_blocks:
return []
# Calculate average text height for relative sizing
heights = [block.height for block in text_blocks]
avg_height = np.mean(heights) if heights else 20
# Group by proximity and text characteristics
paragraphs = []
current_paragraph = []
current_y = text_blocks[0].y
paragraph_threshold = 1.5 * avg_height # Dynamic threshold based on text size
for block in text_blocks:
# Check if this block is part of the same paragraph
if abs(block.y - current_y) <= paragraph_threshold:
current_paragraph.append(block)
else:
# Start new paragraph
if current_paragraph:
paragraph_text = self._join_paragraph_text_advanced(current_paragraph)
if paragraph_text:
paragraphs.append(paragraph_text)
current_paragraph = [block]
current_y = block.y
# Add the last paragraph
if current_paragraph:
paragraph_text = self._join_paragraph_text_advanced(current_paragraph)
if paragraph_text:
paragraphs.append(paragraph_text)
return paragraphs
def _join_paragraph_text_advanced(self, text_blocks: List[LayoutElement]) -> str:
"""Join text blocks into a coherent paragraph with better text processing."""
if not text_blocks:
return ""
# Sort blocks by reading order (left to right, top to bottom)
text_blocks.sort(key=lambda x: (x.y, x.x))
# Extract and clean text
texts = []
for block in text_blocks:
text = block.text.strip()
if text:
texts.append(text)
if not texts:
return ""
# Join with smart spacing
result = ""
for i, text in enumerate(texts):
if i == 0:
result = text
else:
# Check if we need a space before this text
prev_char = result[-1] if result else ""
curr_char = text[0] if text else ""
# Don't add space before punctuation
if curr_char in ',.!?;:':
result += text
# Don't add space after opening parenthesis/bracket
elif prev_char in '([{':
result += text
# Don't add space before closing parenthesis/bracket
elif curr_char in ')]}':
result += text
# Don't add space before common punctuation
elif curr_char in ';:':
result += text
# Handle hyphenation
elif prev_char == '-' and curr_char.isalpha():
result += text
else:
result += " " + text
# Post-process the text
result = self._post_process_text(result)
return result.strip()
def _post_process_text(self, text: str) -> str:
"""Post-process text to improve readability."""
# Fix common OCR issues
text = text.replace('|', 'I') # Common OCR mistake
text = text.replace('0', 'o') # Common OCR mistake in certain contexts
text = text.replace('1', 'l') # Common OCR mistake in certain contexts
# Fix spacing issues
text = re.sub(r'\s+', ' ', text) # Multiple spaces to single space
text = re.sub(r'([.!?])\s*([A-Z])', r'\1 \2', text) # Fix sentence spacing
# Fix common OCR artifacts
text = re.sub(r'[^\w\s.,!?;:()[\]{}"\'-]', '', text) # Remove strange characters
return text
def _classify_paragraph(self, text: str) -> str:
"""Classify a paragraph as heading, list item, table row, or regular text."""
text = text.strip()
# Check if it's a list item
if self._is_list_item(text):
return "list_item"
# Check if it's a table row
if self._is_table_row(text):
return "table_row"
# Check if it's a heading (short text, ends with period, or all caps)
if len(text.split()) <= 5 and (text.endswith('.') or text.isupper()):
return "heading"
return "text"
def _determine_heading_level_from_text(self, text: str) -> int:
"""Determine heading level based on text characteristics."""
text = text.strip()
# Short text is likely a higher level heading
if len(text.split()) <= 3:
return 1
elif len(text.split()) <= 5:
return 2
else:
return 3
def _is_list_item(self, text: str) -> bool:
"""Check if text is a list item."""
text = text.strip()
for pattern in self._list_patterns:
if re.match(pattern, text):
return True
return False
def _is_table_row(self, text: str) -> bool:
"""Check if text might be a table row."""
# Simple heuristic: if text contains multiple tab-separated or pipe-separated parts
if '|' in text or '\t' in text:
return True
# Check for regular spacing that might indicate table columns
words = text.split()
if len(words) >= 4: # More words likely indicate table data
# Check if there are multiple spaces between words (indicating columns)
if ' ' in text: # Double spaces often indicate column separation
return True
return False
def _format_table_row(self, text: str) -> str:
"""Format text as a table row."""
# Split by common table separators
if '|' in text:
cells = [cell.strip() for cell in text.split('|')]
elif '\t' in text:
cells = [cell.strip() for cell in text.split('\t')]
else:
# Try to split by multiple spaces
cells = [cell.strip() for cell in re.split(r'\s{2,}', text)]
# Format as markdown table row
return '| ' + ' | '.join(cells) + ' |'
def join_text_properly(self, texts: List[str]) -> str:
"""Join text words into proper sentences and paragraphs."""
if not texts:
return ""
# Clean and join text
cleaned_texts = []
for text in texts:
# Remove extra whitespace
text = text.strip()
if text:
cleaned_texts.append(text)
if not cleaned_texts:
return ""
# Join with spaces, but be smart about punctuation
result = ""
for i, text in enumerate(cleaned_texts):
if i == 0:
result = text
else:
# Check if we need a space before this word
prev_char = result[-1] if result else ""
curr_char = text[0] if text else ""
# Don't add space before punctuation
if curr_char in ',.!?;:':
result += text
# Don't add space after opening parenthesis/bracket
elif prev_char in '([{':
result += text
# Don't add space before closing parenthesis/bracket
elif curr_char in ')]}':
result += text
else:
result += " " + text
return result.strip()
def create_layout_element_from_block(self, block_data: List[Dict]) -> LayoutElement:
"""Create a LayoutElement from a block of text data."""
if not block_data:
return LayoutElement("", 0, 0, 0, 0)
# Sort by line_num and word_num to maintain reading order
block_data.sort(key=lambda x: (x['line_num'], x['word_num']))
# Extract text and position information
texts = [item['text'] for item in block_data]
x_coords = [item['x'] for item in block_data]
y_coords = [item['y'] for item in block_data]
widths = [item['width'] for item in block_data]
heights = [item['height'] for item in block_data]
confidences = [item['conf'] for item in block_data]
# Calculate bounding box
min_x = min(x_coords)
min_y = min(y_coords)
max_x = max(x + w for x, w in zip(x_coords, widths))
max_y = max(y + h for y, h in zip(y_coords, heights))
# Join text with proper spacing
text = self.join_text_properly(texts)
return LayoutElement(
text=text,
x=min_x,
y=min_y,
width=max_x - min_x,
height=max_y - min_y,
element_type="text",
confidence=np.mean(confidences) if confidences else 0.0
)
```
## /docstrange/pipeline/model_downloader.py
```py path="/docstrange/pipeline/model_downloader.py"
"""Model downloader utility for downloading pre-trained models from Hugging Face."""
import logging
import os
from pathlib import Path
from typing import Optional
import requests
from tqdm import tqdm
from ..utils.gpu_utils import is_gpu_available, get_gpu_info
logger = logging.getLogger(__name__)
class ModelDownloader:
"""Downloads pre-trained models from Hugging Face or Nanonets S3."""
# Nanonets S3 model URLs (primary source)
S3_BASE_URL = "https://public-vlms.s3-us-west-2.amazonaws.com/llm-data-extractor"
# Model configurations with both S3 and HuggingFace sources
LAYOUT_MODEL = {
"s3_url": f"{S3_BASE_URL}/layout-model-v2.2.0.tar.gz",
"repo_id": "ds4sd/docling-models",
"revision": "v2.2.0",
"model_path": "model_artifacts/layout",
"cache_folder": "layout"
}
TABLE_MODEL = {
"s3_url": f"{S3_BASE_URL}/tableformer-model-v2.2.0.tar.gz",
"repo_id": "ds4sd/docling-models",
"revision": "v2.2.0",
"model_path": "model_artifacts/tableformer",
"cache_folder": "tableformer"
}
# Nanonets OCR model configuration
NANONETS_OCR_MODEL = {
"s3_url": f"{S3_BASE_URL}/Nanonets-OCR-s.tar.gz",
"repo_id": "nanonets/Nanonets-OCR-s",
"revision": "main",
"cache_folder": "nanonets-ocr",
}
# Note: EasyOCR downloads its own models automatically, no need for custom model
def __init__(self, cache_dir: Optional[Path] = None):
"""Initialize the model downloader.
Args:
cache_dir: Directory to cache downloaded models
"""
if cache_dir is None:
cache_dir = Path.home() / ".cache" / "docstrange" / "models"
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"Model cache directory: {self.cache_dir}")
def download_models(self, force: bool = False, progress: bool = True) -> Path:
"""Download all required models.
Args:
force: Force re-download even if models exist
progress: Show download progress
Returns:
Path to the models directory
"""
logger.info("Downloading pre-trained models...")
# Auto-detect GPU for Nanonets model
gpu_available = is_gpu_available()
print("gpu_available", gpu_available)
if gpu_available:
logger.info("GPU detected - including Nanonets OCR model")
else:
logger.info("No GPU detected - skipping Nanonets OCR model (cloud mode)")
models_to_download = [
("Layout Model", self.LAYOUT_MODEL),
("Table Structure Model", self.TABLE_MODEL)
]
# Add Nanonets OCR model only if GPU is available
if gpu_available:
models_to_download.append(("Nanonets OCR Model", self.NANONETS_OCR_MODEL))
for model_name, model_config in models_to_download:
logger.info(f"Downloading {model_name}...")
self._download_model(model_config, force, progress)
logger.info("All models downloaded successfully!")
return self.cache_dir
def _download_model(self, model_config: dict, force: bool, progress: bool):
"""Download a specific model.
Args:
model_config: Model configuration dictionary
force: Force re-download
progress: Show progress
"""
model_dir = self.cache_dir / model_config["cache_folder"]
if model_dir.exists() and not force:
logger.info(f"Model already exists at {model_dir}")
return
# Create model directory
model_dir.mkdir(parents=True, exist_ok=True)
success = False
# Check if user prefers Hugging Face via environment variable
prefer_hf = os.environ.get("document_extractor_PREFER_HF", "false").lower() == "true"
# Try S3 first (Nanonets hosted models) unless user prefers HF
if not prefer_hf and "s3_url" in model_config:
try:
logger.info(f"Downloading from Nanonets S3: {model_config['s3_url']}")
self._download_from_s3(
s3_url=model_config["s3_url"],
local_dir=model_dir,
force=force,
progress=progress
)
success = True
logger.info("Successfully downloaded from Nanonets S3")
except Exception as e:
logger.warning(f"S3 download failed: {e}")
logger.info("Falling back to Hugging Face...")
# Fallback to Hugging Face if S3 fails
if not success:
self._download_from_hf(
repo_id=model_config["repo_id"],
revision=model_config["revision"],
local_dir=model_dir,
force=force,
progress=progress
)
def _download_from_hf(self, repo_id: str, revision: str, local_dir: Path,
force: bool, progress: bool):
"""Download model from Hugging Face using docling's logic.
Args:
repo_id: Hugging Face repository ID
revision: Git revision/tag
local_dir: Local directory to save model
force: Force re-download
progress: Show progress
"""
try:
from huggingface_hub import snapshot_download
from huggingface_hub.utils import disable_progress_bars
import huggingface_hub
if not progress:
disable_progress_bars()
# Check if models are already downloaded
if local_dir.exists() and any(local_dir.iterdir()):
logger.info(f"Model {repo_id} already exists at {local_dir}")
return
# Try to download with current authentication
try:
download_path = snapshot_download(
repo_id=repo_id,
force_download=force,
local_dir=str(local_dir),
revision=revision,
token=None, # Use default token if available
)
logger.info(f"Successfully downloaded {repo_id} to {download_path}")
except huggingface_hub.errors.HfHubHTTPError as e:
if "401" in str(e) or "Unauthorized" in str(e):
logger.warning(
f"Authentication failed for {repo_id}. This model may require a Hugging Face token.\n"
"To fix this:\n"
"1. Create a free account at https://huggingface.co/\n"
"2. Generate a token at https://huggingface.co/settings/tokens\n"
"3. Set it as environment variable: export HF_TOKEN='your_token_here'\n"
"4. Or run: huggingface-cli login\n\n"
"The library will continue with basic OCR capabilities."
)
# Don't raise the error, just log it and continue
return
else:
raise
except ImportError:
logger.error("huggingface_hub not available. Please install it: pip install huggingface_hub")
raise
except Exception as e:
logger.error(f"Failed to download model {repo_id}: {e}")
# Don't raise for authentication errors - allow fallback processing
if "401" not in str(e) and "Unauthorized" not in str(e):
raise
def get_model_path(self, model_type: str) -> Optional[Path]:
"""Get the path to a specific model.
Args:
model_type: Type of model ('layout', 'table', 'nanonets-ocr')
Returns:
Path to the model directory, or None if not found
"""
model_mapping = {
'layout': self.LAYOUT_MODEL["cache_folder"],
'table': self.TABLE_MODEL["cache_folder"],
'nanonets-ocr': self.NANONETS_OCR_MODEL["cache_folder"]
}
if model_type not in model_mapping:
logger.error(f"Unknown model type: {model_type}")
return None
model_path = self.cache_dir / model_mapping[model_type]
if not model_path.exists():
logger.warning(f"Model {model_type} not found at {model_path}")
return None
return model_path
def are_models_cached(self) -> bool:
"""Check if all required models are cached.
Returns:
True if all required models are cached, False otherwise
"""
layout_path = self.get_model_path('layout')
table_path = self.get_model_path('table')
# Only check for Nanonets model if GPU is available
if is_gpu_available():
nanonets_path = self.get_model_path('nanonets-ocr')
return layout_path is not None and table_path is not None and nanonets_path is not None
else:
return layout_path is not None and table_path is not None
def _download_from_s3(self, s3_url: str, local_dir: Path, force: bool, progress: bool):
"""Download model from Nanonets S3.
Args:
s3_url: S3 URL of the model archive
local_dir: Local directory to extract model
force: Force re-download
progress: Show progress
"""
import tarfile
import tempfile
# Download the tar.gz file
response = requests.get(s3_url, stream=True)
response.raise_for_status()
total_size = int(response.headers.get('content-length', 0))
with tempfile.NamedTemporaryFile(suffix='.tar.gz', delete=False) as tmp_file:
if progress and total_size > 0:
with tqdm(total=total_size, unit='B', unit_scale=True, desc="Downloading") as pbar:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
tmp_file.write(chunk)
pbar.update(len(chunk))
else:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
tmp_file.write(chunk)
tmp_file_path = tmp_file.name
try:
# Extract the archive
logger.info(f"Extracting model to {local_dir}")
with tarfile.open(tmp_file_path, 'r:gz') as tar:
tar.extractall(path=local_dir)
logger.info("Model extraction completed successfully")
finally:
# Clean up temporary file
try:
os.unlink(tmp_file_path)
except:
pass
def get_cache_info(self) -> dict:
"""Get information about cached models.
Returns:
Dictionary with cache information
"""
info = {
'cache_dir': str(self.cache_dir),
'gpu_info': get_gpu_info(),
'models': {}
}
# Always check layout and table models
for model_type in ['layout', 'table']:
path = self.get_model_path(model_type)
info['models'][model_type] = {
'cached': path is not None,
'path': str(path) if path else None
}
# Only check Nanonets model if GPU is available
if is_gpu_available():
path = self.get_model_path('nanonets-ocr')
info['models']['nanonets-ocr'] = {
'cached': path is not None,
'path': str(path) if path else None,
'gpu_required': True
}
else:
info['models']['nanonets-ocr'] = {
'cached': False,
'path': None,
'gpu_required': True,
'skipped': 'No GPU available'
}
return info
```
## /docstrange/pipeline/nanonets_processor.py
```py path="/docstrange/pipeline/nanonets_processor.py"
"""Neural Document Processor using Nanonets OCR for superior document understanding."""
import logging
import os
from typing import Optional
from pathlib import Path
from PIL import Image
logger = logging.getLogger(__name__)
class NanonetsDocumentProcessor:
"""Neural Document Processor using Nanonets OCR model."""
def __init__(self, cache_dir: Optional[Path] = None):
"""Initialize the Neural Document Processor with Nanonets OCR."""
logger.info("Initializing Neural Document Processor with Nanonets OCR...")
# Initialize models
self._initialize_models(cache_dir)
logger.info("Neural Document Processor initialized successfully")
def _initialize_models(self, cache_dir: Optional[Path] = None):
"""Initialize Nanonets OCR model from local cache."""
try:
from transformers import AutoTokenizer, AutoProcessor, AutoModelForImageTextToText
from .model_downloader import ModelDownloader
# Get model downloader instance
model_downloader = ModelDownloader(cache_dir)
# Get the path to the locally cached Nanonets model
model_path = model_downloader.get_model_path('nanonets-ocr')
if model_path is None:
raise RuntimeError(
"Failed to download Nanonets OCR model. "
"Please ensure you have sufficient disk space and internet connection."
)
# The actual model files are in a subdirectory with the same name
actual_model_path = model_path / "Nanonets-OCR-ss"
if not actual_model_path.exists():
raise RuntimeError(
f"Model files not found at expected path: {actual_model_path}"
)
logger.info(f"Loading Nanonets OCR model from local cache: {actual_model_path}")
# Load model from local path
self.model = AutoModelForImageTextToText.from_pretrained(
str(actual_model_path),
torch_dtype="auto",
device_map="auto",
local_files_only=True # Use only local files
)
self.model.eval()
self.tokenizer = AutoTokenizer.from_pretrained(
str(actual_model_path),
local_files_only=True
)
self.processor = AutoProcessor.from_pretrained(
str(actual_model_path),
local_files_only=True
)
logger.info("Nanonets OCR model loaded successfully from local cache")
except ImportError as e:
logger.error(f"Transformers library not available: {e}")
raise ImportError(
"Transformers library is required for Nanonets OCR. "
"Please install it: pip install transformers"
)
except Exception as e:
logger.error(f"Failed to initialize Nanonets OCR model: {e}")
raise
def extract_text(self, image_path: str) -> str:
"""Extract text from image using Nanonets OCR."""
try:
if not os.path.exists(image_path):
logger.error(f"Image file does not exist: {image_path}")
return ""
return self._extract_text_with_nanonets(image_path)
except Exception as e:
logger.error(f"Nanonets OCR extraction failed: {e}")
return ""
def extract_text_with_layout(self, image_path: str) -> str:
"""Extract text with layout awareness using Nanonets OCR.
Note: Nanonets OCR already provides layout-aware extraction,
so this method returns the same result as extract_text().
"""
return self.extract_text(image_path)
def _extract_text_with_nanonets(self, image_path: str, max_new_tokens: int = 4096) -> str:
"""Extract text using Nanonets OCR model."""
try:
prompt = """Extract the text from the above document as if you were reading it naturally. Return the tables in html format. Return the equations in LaTeX representation. If there is an image in the document and image caption is not present, add a small description of the image inside the <img></img> tag; otherwise, add the image caption inside <img></img>. Watermarks should be wrapped in brackets. Ex: <watermark>OFFICIAL COPY</watermark>. Page numbers should be wrapped in brackets. Ex: <page_number>14</page_number> or <page_number>9/22</page_number>. Prefer using ☐ and ☑ for check boxes."""
image = Image.open(image_path)
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": [
{"type": "image", "image": f"file://{image_path}"},
{"type": "text", "text": prompt},
]},
]
text = self.processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
inputs = self.processor(text=[text], images=[image], padding=True, return_tensors="pt")
inputs = inputs.to(self.model.device)
output_ids = self.model.generate(**inputs, max_new_tokens=max_new_tokens, do_sample=False)
generated_ids = [output_ids[len(input_ids):] for input_ids, output_ids in zip(inputs.input_ids, output_ids)]
output_text = self.processor.batch_decode(generated_ids, skip_special_tokens=True, clean_up_tokenization_spaces=True)
return output_text[0]
except Exception as e:
logger.error(f"Nanonets OCR extraction failed: {e}")
return ""
def __del__(self):
"""Cleanup resources."""
pass
```
## /docstrange/pipeline/neural_document_processor.py
```py path="/docstrange/pipeline/neural_document_processor.py"
"""Neural Document Processor using docling's pre-trained models for superior document understanding."""
import logging
import os
import platform
import sys
from typing import Optional, List, Dict, Any, Tuple
from pathlib import Path
from PIL import Image
import numpy as np
# macOS-specific NumPy compatibility fix
if platform.system() == "Darwin":
try:
import numpy as np
# Check if we're on NumPy 2.x
if hasattr(np, '__version__') and np.__version__.startswith('2'):
# Set environment variable to use NumPy 1.x compatibility mode
os.environ['NUMPY_EXPERIMENTAL_ARRAY_FUNCTION'] = '0'
# Also set this for PyTorch compatibility
os.environ['PYTORCH_NUMPY_COMPATIBILITY'] = '1'
logger = logging.getLogger(__name__)
logger.warning(
"NumPy 2.x detected on macOS. This may cause compatibility issues. "
"Consider downgrading to NumPy 1.x: pip install 'numpy<2.0.0'"
)
except ImportError:
pass
# Runtime NumPy version check
def _check_numpy_version():
"""Check NumPy version and warn about compatibility issues."""
try:
import numpy as np
version = np.__version__
if version.startswith('2'):
logger = logging.getLogger(__name__)
logger.error(
f"NumPy {version} detected. This library requires NumPy 1.x for compatibility "
"with docling models. Please downgrade NumPy:\n"
"pip install 'numpy<2.0.0'\n"
"or\n"
"pip install --upgrade llm-data-extractor"
)
if platform.system() == "Darwin":
logger.error(
"On macOS, NumPy 2.x is known to cause crashes with PyTorch. "
"Downgrading to NumPy 1.x is strongly recommended."
)
return False
return True
except ImportError:
return True
from .model_downloader import ModelDownloader
from .layout_detector import LayoutDetector
logger = logging.getLogger(__name__)
class NeuralDocumentProcessor:
"""Neural Document Processor using docling's pre-trained models."""
def __init__(self, cache_dir: Optional[Path] = None):
"""Initialize the Neural Document Processor."""
logger.info("Initializing Neural Document Processor...")
# Check NumPy version compatibility
if not _check_numpy_version():
raise RuntimeError(
"Incompatible NumPy version detected. Please downgrade to NumPy 1.x: "
"pip install 'numpy<2.0.0'"
)
# Initialize model downloader
self.model_downloader = ModelDownloader(cache_dir)
# Initialize layout detector
self.layout_detector = LayoutDetector()
# Initialize models
self._initialize_models()
logger.info("Neural Document Processor initialized successfully")
def _initialize_models(self):
"""Initialize all required models."""
try:
# Initialize model paths
self._initialize_model_paths()
# Initialize docling neural models
self._initialize_docling_models()
except Exception as e:
logger.error(f"Failed to initialize models: {e}")
raise
def _initialize_model_paths(self):
"""Initialize paths to downloaded models."""
from .model_downloader import ModelDownloader
downloader = ModelDownloader()
# Check if models exist, if not download them
layout_path = downloader.get_model_path('layout')
table_path = downloader.get_model_path('table')
# If any model is missing, download all models
if not layout_path or not table_path:
logger.info("Some models are missing. Downloading all required models...")
logger.info(f"Models will be cached at: {downloader.cache_dir}")
try:
downloader.download_models(force=False, progress=True)
# Get paths again after download
layout_path = downloader.get_model_path('layout')
table_path = downloader.get_model_path('table')
# Check if download was successful
if layout_path and table_path:
logger.info("Model download completed successfully!")
else:
logger.warning("Some models may not have downloaded successfully due to authentication issues.")
logger.info("Falling back to basic document processing without advanced neural models.")
# Set flags to indicate fallback mode
self._use_fallback_mode = True
return
except Exception as e:
logger.warning(f"Failed to download models: {e}")
if "401" in str(e) or "Unauthorized" in str(e) or "Authentication" in str(e):
logger.info(
"Model download failed due to authentication. Using basic document processing.\n"
"For enhanced features, please set up Hugging Face authentication:\n"
"1. Create account at https://huggingface.co/\n"
"2. Generate token at https://huggingface.co/settings/tokens\n"
"3. Run: huggingface-cli login"
)
self._use_fallback_mode = True
return
else:
raise ValueError(f"Failed to download required models: {e}")
else:
logger.info("All required models found in cache.")
# Set fallback mode flag
self._use_fallback_mode = False
# Set model paths
self.layout_model_path = layout_path
self.table_model_path = table_path
if not self.layout_model_path or not self.table_model_path:
if hasattr(self, '_use_fallback_mode') and self._use_fallback_mode:
logger.info("Running in fallback mode without advanced neural models")
return
else:
raise ValueError("One or more required models not found")
# The models are downloaded with the full repository structure
# The entire repo is downloaded to each cache folder, so we need to navigate to the specific model paths
# Layout model is in layout/model_artifacts/layout/
# Table model is in tableformer/model_artifacts/tableformer/accurate/
# Note: EasyOCR downloads its own models automatically
# Check if the expected structure exists, if not use the cache folder directly
layout_artifacts = self.layout_model_path / "model_artifacts" / "layout"
table_artifacts = self.table_model_path / "model_artifacts" / "tableformer" / "accurate"
if layout_artifacts.exists():
self.layout_model_path = layout_artifacts
else:
# Fallback: use the cache folder directly
logger.warning(f"Expected layout model structure not found, using cache folder directly")
if table_artifacts.exists():
self.table_model_path = table_artifacts
else:
# Fallback: use the cache folder directly
logger.warning(f"Expected table model structure not found, using cache folder directly")
logger.info(f"Layout model path: {self.layout_model_path}")
logger.info(f"Table model path: {self.table_model_path}")
logger.info("EasyOCR will download its own models automatically")
# Verify model files exist (with more flexible checking)
layout_model_file = self.layout_model_path / "model.safetensors"
table_config_file = self.table_model_path / "tm_config.json"
if not layout_model_file.exists():
# Try alternative locations
alt_layout_file = self.layout_model_path / "layout" / "model.safetensors"
if alt_layout_file.exists():
self.layout_model_path = self.layout_model_path / "layout"
layout_model_file = alt_layout_file
else:
raise FileNotFoundError(f"Missing layout model file. Checked: {layout_model_file}, {alt_layout_file}")
if not table_config_file.exists():
# Try alternative locations
alt_table_file = self.table_model_path / "tableformer" / "accurate" / "tm_config.json"
if alt_table_file.exists():
self.table_model_path = self.table_model_path / "tableformer" / "accurate"
table_config_file = alt_table_file
else:
raise FileNotFoundError(f"Missing table config file. Checked: {table_config_file}, {alt_table_file}")
def _initialize_docling_models(self):
"""Initialize docling's pre-trained models."""
# Check if we're in fallback mode
if hasattr(self, '_use_fallback_mode') and self._use_fallback_mode:
logger.info("Skipping docling models initialization - running in fallback mode")
self.use_advanced_models = False
self.layout_predictor = None
self.table_predictor = None
self.ocr_reader = None
return
try:
# Import docling models
from docling_ibm_models.layoutmodel.layout_predictor import LayoutPredictor
from docling_ibm_models.tableformer.common import read_config
from docling_ibm_models.tableformer.data_management.tf_predictor import TFPredictor
import easyocr
# Initialize layout model
self.layout_predictor = LayoutPredictor(
artifact_path=str(self.layout_model_path),
device='cpu',
num_threads=4
)
# Initialize table structure model
tm_config = read_config(str(self.table_model_path / "tm_config.json"))
tm_config["model"]["save_dir"] = str(self.table_model_path)
self.table_predictor = TFPredictor(tm_config, 'cpu', 4)
# Initialize OCR model
self.ocr_reader = easyocr.Reader(['en'])
self.use_advanced_models = True
logger.info("Docling neural models initialized successfully")
except ImportError as e:
logger.error(f"Docling models not available: {e}")
raise
except Exception as e:
error_msg = str(e)
if "NumPy" in error_msg or "numpy" in error_msg.lower():
logger.error(
f"NumPy compatibility error: {error_msg}\n"
"This is likely due to NumPy 2.x incompatibility. Please downgrade:\n"
"pip install 'numpy<2.0.0'"
)
if platform.system() == "Darwin":
logger.error(
"On macOS, NumPy 2.x is known to cause crashes with PyTorch. "
"Downgrading to NumPy 1.x is required."
)
else:
logger.error(f"Failed to initialize docling models: {e}")
raise
def extract_text(self, image_path: str) -> str:
"""Extract text from image using neural OCR."""
try:
if not os.path.exists(image_path):
logger.error(f"Image file does not exist: {image_path}")
return ""
return self._extract_text_advanced(image_path)
except Exception as e:
logger.error(f"OCR extraction failed: {e}")
return ""
def extract_text_with_layout(self, image_path: str) -> str:
"""Extract text with layout awareness using neural models."""
try:
if not os.path.exists(image_path):
logger.error(f"Image file does not exist: {image_path}")
return ""
return self._extract_text_with_layout_advanced(image_path)
except Exception as e:
logger.error(f"Layout-aware OCR extraction failed: {e}")
return ""
def _extract_text_advanced(self, image_path: str) -> str:
"""Extract text using docling's advanced models."""
try:
with Image.open(image_path) as img:
if img.mode != 'RGB':
img = img.extract('RGB')
results = self.ocr_reader.readtext(img)
texts = []
for (bbox, text, confidence) in results:
if confidence > 0.5:
texts.append(text)
return ' '.join(texts)
except Exception as e:
logger.error(f"Advanced OCR extraction failed: {e}")
return ""
def _extract_text_with_layout_advanced(self, image_path: str) -> str:
"""Extract text with layout awareness using docling's neural models."""
try:
with Image.open(image_path) as img:
if img.mode != 'RGB':
img = img.extract('RGB')
# Get layout predictions using neural model
layout_results = list(self.layout_predictor.predict(img))
# Process layout results and extract text
text_blocks = []
table_blocks = []
for pred in layout_results:
label = pred.get('label', '').lower().replace(' ', '_').replace('-', '_')
# Construct bbox from l, t, r, b
if all(k in pred for k in ['l', 't', 'r', 'b']):
bbox = [pred['l'], pred['t'], pred['r'], pred['b']]
else:
bbox = pred.get('bbox') or pred.get('box')
if not bbox:
continue
# Extract text from this region using OCR
region_text = self._extract_text_from_region(img, bbox)
if not region_text or pred.get('confidence', 1.0) < 0.5:
continue
from .layout_detector import LayoutElement
# Handle different element types
if label in ['table', 'document_index']:
# Process tables separately
table_blocks.append({
'text': region_text,
'bbox': bbox,
'label': label,
'confidence': pred.get('confidence', 1.0)
})
elif label in ['title', 'section_header', 'subtitle_level_1']:
# Headers
text_blocks.append(LayoutElement(
text=region_text,
x=bbox[0],
y=bbox[1],
width=bbox[2] - bbox[0],
height=bbox[3] - bbox[1],
element_type='heading',
confidence=pred.get('confidence', 1.0)
))
elif label in ['list_item']:
# List items
text_blocks.append(LayoutElement(
text=region_text,
x=bbox[0],
y=bbox[1],
width=bbox[2] - bbox[0],
height=bbox[3] - bbox[1],
element_type='list_item',
confidence=pred.get('confidence', 1.0)
))
else:
# Regular text/paragraphs
text_blocks.append(LayoutElement(
text=region_text,
x=bbox[0],
y=bbox[1],
width=bbox[2] - bbox[0],
height=bbox[3] - bbox[1],
element_type='paragraph',
confidence=pred.get('confidence', 1.0)
))
# Sort by position (top to bottom, left to right)
text_blocks.sort(key=lambda x: (x.y, x.x))
# Process tables using table structure model
processed_tables = self._process_tables_with_structure_model(img, table_blocks)
# Convert to markdown with proper structure
return self._convert_to_structured_markdown_advanced(text_blocks, processed_tables, img.size)
except Exception as e:
logger.error(f"Advanced layout-aware OCR failed: {e}")
return ""
def _process_tables_with_structure_model(self, img: Image.Image, table_blocks: List[Dict]) -> List[Dict]:
"""Process tables using the table structure model."""
processed_tables = []
for table_block in table_blocks:
try:
# Extract table region
bbox = table_block['bbox']
x1, y1, x2, y2 = bbox
table_region = img.crop((x1, y1, x2, y2))
# Convert to numpy array
table_np = np.array(table_region)
# Create page input in the format expected by docling table structure model
page_input = {
"width": table_np.shape[1],
"height": table_np.shape[0],
"image": table_np,
"tokens": [] # Empty tokens since we're not using cell matching
}
# The bbox coordinates should be relative to the table region
table_bbox = [0, 0, x2-x1, y2-y1]
# Predict table structure
tf_output = self.table_predictor.multi_table_predict(page_input, [table_bbox], do_matching=False)
table_out = tf_output[0] if isinstance(tf_output, list) else tf_output
# Extract table data
table_data = []
tf_responses = table_out.get("tf_responses", []) if isinstance(table_out, dict) else []
for element in tf_responses:
if isinstance(element, dict) and "bbox" in element:
cell_bbox = element["bbox"]
# Handle bbox as dict with keys l, t, r, b
if isinstance(cell_bbox, dict) and all(k in cell_bbox for k in ["l", "t", "r", "b"]):
cell_x1 = cell_bbox["l"]
cell_y1 = cell_bbox["t"]
cell_x2 = cell_bbox["r"]
cell_y2 = cell_bbox["b"]
cell_region = table_region.crop((cell_x1, cell_y1, cell_x2, cell_y2))
cell_np = np.array(cell_region)
cell_text = self._extract_text_from_region_numpy(cell_np)
table_data.append(cell_text)
elif isinstance(cell_bbox, list) and len(cell_bbox) == 4:
cell_x1, cell_y1, cell_x2, cell_y2 = cell_bbox
cell_region = table_region.crop((cell_x1, cell_y1, cell_x2, cell_y2))
cell_np = np.array(cell_region)
cell_text = self._extract_text_from_region_numpy(cell_np)
table_data.append(cell_text)
else:
pass
else:
pass
# Organize table data into rows and columns
processed_table = self._organize_table_data(table_data, table_out if isinstance(table_out, dict) else {})
# Preserve the original bbox from the table block
processed_table['bbox'] = table_block['bbox']
processed_tables.append(processed_table)
except Exception as e:
logger.error(f"Failed to process table: {e}")
# Fallback to simple table extraction
processed_tables.append({
'type': 'simple_table',
'text': table_block['text'],
'bbox': table_block['bbox']
})
return processed_tables
def _extract_text_from_region_numpy(self, region_np: np.ndarray) -> str:
"""Extract text from numpy array region."""
try:
results = self.ocr_reader.readtext(region_np)
texts = []
for (_, text, confidence) in results:
if confidence > 0.5:
texts.append(text)
return ' '.join(texts)
except Exception as e:
logger.error(f"Failed to extract text from numpy region: {e}")
return ""
def _organize_table_data(self, table_data: list, table_out: dict) -> dict:
"""Organize table data into proper structure using row/col indices from tf_responses."""
try:
tf_responses = table_out.get("tf_responses", []) if isinstance(table_out, dict) else []
num_rows = table_out.get("predict_details", {}).get("num_rows", 0)
num_cols = table_out.get("predict_details", {}).get("num_cols", 0)
# Build empty grid
grid = [["" for _ in range(num_cols)] for _ in range(num_rows)]
# Place cell texts in the correct grid positions
for idx, element in enumerate(tf_responses):
row = element.get("start_row_offset_idx", 0)
col = element.get("start_col_offset_idx", 0)
# Use the extracted text if available, else fallback to element text
text = table_data[idx] if idx < len(table_data) else element.get("text", "")
grid[row][col] = text
return {
'type': 'structured_table',
'grid': grid,
'num_rows': num_rows,
'num_cols': num_cols
}
except Exception as e:
logger.error(f"Failed to organize table data: {e}")
return {
'type': 'simple_table',
'data': table_data
}
def _convert_table_to_markdown(self, table: dict) -> str:
"""Convert structured table to markdown format."""
if table['type'] != 'structured_table':
return f"**Table:** {table.get('text', '')}"
grid = table['grid']
if not grid or not grid[0]:
return ""
# Find the first non-empty row to use as header
header_row = None
for row in grid:
if any(cell.strip() for cell in row):
header_row = row
break
if not header_row:
return ""
# Use the header row as is (preserve all columns)
header_cells = [cell.strip() if cell else "" for cell in header_row]
markdown_lines = []
markdown_lines.append("| " + " | ".join(header_cells) + " |")
markdown_lines.append("|" + "|".join(["---"] * len(header_cells)) + "|")
# Add data rows (skip the header row)
header_index = grid.index(header_row)
for row in grid[header_index + 1:]:
cells = [cell.strip() if cell else "" for cell in row]
markdown_lines.append("| " + " | ".join(cells) + " |")
return '\n'.join(markdown_lines)
def _convert_to_structured_markdown_advanced(self, text_blocks: List, processed_tables: List[Dict], img_size: Tuple[int, int]) -> str:
"""Convert text blocks and tables to structured markdown."""
markdown_parts = []
# Sort all elements by position
all_elements = []
# Add text blocks
for block in text_blocks:
all_elements.append({
'type': 'text',
'element': block,
'y': block.y,
'x': block.x
})
# Add tables
for table in processed_tables:
if 'bbox' in table:
all_elements.append({
'type': 'table',
'element': table,
'y': table['bbox'][1],
'x': table['bbox'][0]
})
else:
logger.warning(f"Table has no bbox, skipping: {table}")
# Sort by position
all_elements.sort(key=lambda x: (x['y'], x['x']))
# Convert to markdown
for element in all_elements:
if element['type'] == 'text':
block = element['element']
text = block.text.strip()
if not text:
continue
if block.element_type == 'heading':
# Determine heading level based on font size/position
level = self._determine_heading_level(block)
markdown_parts.append(f"{'#' * level} {text}")
markdown_parts.append("")
elif block.element_type == 'list_item':
markdown_parts.append(f"- {text}")
else:
markdown_parts.append(text)
markdown_parts.append("")
elif element['type'] == 'table':
table = element['element']
if table['type'] == 'structured_table':
# Convert structured table to markdown
table_md = self._convert_table_to_markdown(table)
markdown_parts.append(table_md)
markdown_parts.append("")
else:
# Simple table
markdown_parts.append(f"**Table:** {table.get('text', '')}")
markdown_parts.append("")
return '\n'.join(markdown_parts)
def _determine_heading_level(self, block) -> int:
"""Determine heading level based on font size and position."""
# Simple heuristic: larger text or positioned at top = higher level
if block.y < 100: # Near top of page
return 1
elif block.height > 30: # Large text
return 2
else:
return 3
def _extract_text_from_region(self, img: Image.Image, bbox: List[float]) -> str:
"""Extract text from a specific region of the image."""
try:
# Crop the region
x1, y1, x2, y2 = bbox
region = img.crop((x1, y1, x2, y2))
# Convert PIL image to numpy array for easyocr
region_np = np.array(region)
# Use OCR on the region
results = self.ocr_reader.readtext(region_np)
texts = []
for (_, text, confidence) in results:
if confidence > 0.5:
texts.append(text)
return ' '.join(texts)
except Exception as e:
logger.error(f"Failed to extract text from region: {e}")
return ""
def __del__(self):
"""Cleanup resources."""
pass
```
## /docstrange/pipeline/ocr_service.py
```py path="/docstrange/pipeline/ocr_service.py"
"""OCR Service abstraction for neural document processing."""
import os
import logging
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional
logger = logging.getLogger(__name__)
class OCRService(ABC):
"""Abstract base class for OCR services."""
@abstractmethod
def extract_text(self, image_path: str) -> str:
"""Extract text from image.
Args:
image_path: Path to the image file
Returns:
Extracted text as string
"""
pass
@abstractmethod
def extract_text_with_layout(self, image_path: str) -> str:
"""Extract text with layout awareness from image.
Args:
image_path: Path to the image file
Returns:
Layout-aware extracted text as markdown
"""
pass
class NanonetsOCRService(OCRService):
"""Nanonets OCR implementation using NanonetsDocumentProcessor."""
def __init__(self):
"""Initialize the service."""
from .nanonets_processor import NanonetsDocumentProcessor
self._processor = NanonetsDocumentProcessor()
logger.info("NanonetsOCRService initialized")
@property
def model(self):
"""Get the Nanonets model."""
return self._processor.model
@property
def processor(self):
"""Get the Nanonets processor."""
return self._processor.processor
@property
def tokenizer(self):
"""Get the Nanonets tokenizer."""
return self._processor.tokenizer
def extract_text(self, image_path: str) -> str:
"""Extract text using Nanonets OCR."""
try:
# Validate image file
if not os.path.exists(image_path):
logger.error(f"Image file does not exist: {image_path}")
return ""
# Check if file is readable
try:
from PIL import Image
with Image.open(image_path) as img:
logger.info(f"Image loaded successfully: {img.size} {img.mode}")
except Exception as e:
logger.error(f"Failed to load image: {e}")
return ""
try:
text = self._processor.extract_text(image_path)
logger.info(f"Extracted text length: {len(text)}")
return text.strip()
except Exception as e:
logger.error(f"Nanonets OCR extraction failed: {e}")
return ""
except Exception as e:
logger.error(f"Nanonets OCR extraction failed: {e}")
return ""
def extract_text_with_layout(self, image_path: str) -> str:
"""Extract text with layout awareness using Nanonets OCR."""
try:
# Validate image file
if not os.path.exists(image_path):
logger.error(f"Image file does not exist: {image_path}")
return ""
# Check if file is readable
try:
from PIL import Image
with Image.open(image_path) as img:
logger.info(f"Image loaded successfully: {img.size} {img.mode}")
except Exception as e:
logger.error(f"Failed to load image: {e}")
return ""
try:
text = self._processor.extract_text_with_layout(image_path)
logger.info(f"Layout-aware extracted text length: {len(text)}")
return text.strip()
except Exception as e:
logger.error(f"Nanonets OCR layout-aware extraction failed: {e}")
return ""
except Exception as e:
logger.error(f"Nanonets OCR layout-aware extraction failed: {e}")
return ""
class NeuralOCRService(OCRService):
"""Neural OCR implementation using docling's pre-trained models."""
def __init__(self):
"""Initialize the service."""
from .neural_document_processor import NeuralDocumentProcessor
self._processor = NeuralDocumentProcessor()
logger.info("NeuralOCRService initialized")
def extract_text(self, image_path: str) -> str:
"""Extract text using Neural OCR (docling models)."""
try:
# Validate image file
if not os.path.exists(image_path):
logger.error(f"Image file does not exist: {image_path}")
return ""
# Check if file is readable
try:
from PIL import Image
with Image.open(image_path) as img:
logger.info(f"Image loaded successfully: {img.size} {img.mode}")
except Exception as e:
logger.error(f"Failed to load image: {e}")
return ""
try:
text = self._processor.extract_text(image_path)
logger.info(f"Extracted text length: {len(text)}")
return text.strip()
except Exception as e:
logger.error(f"Neural OCR extraction failed: {e}")
return ""
except Exception as e:
logger.error(f"Neural OCR extraction failed: {e}")
return ""
def extract_text_with_layout(self, image_path: str) -> str:
"""Extract text with layout awareness using Neural OCR."""
try:
# Validate image file
if not os.path.exists(image_path):
logger.error(f"Image file does not exist: {image_path}")
return ""
# Check if file is readable
try:
from PIL import Image
with Image.open(image_path) as img:
logger.info(f"Image loaded successfully: {img.size} {img.mode}")
except Exception as e:
logger.error(f"Failed to load image: {e}")
return ""
try:
text = self._processor.extract_text_with_layout(image_path)
logger.info(f"Layout-aware extracted text length: {len(text)}")
return text.strip()
except Exception as e:
logger.error(f"Neural OCR layout-aware extraction failed: {e}")
return ""
except Exception as e:
logger.error(f"Neural OCR layout-aware extraction failed: {e}")
return ""
class OCRServiceFactory:
"""Factory for creating OCR services based on configuration."""
@staticmethod
def create_service(provider: str = None) -> OCRService:
"""Create OCR service based on provider configuration.
Args:
provider: OCR provider name (defaults to config)
Returns:
OCRService instance
"""
from docstrange.config import InternalConfig
if provider is None:
provider = getattr(InternalConfig, 'ocr_provider', 'nanonets')
if provider.lower() == 'nanonets':
return NanonetsOCRService()
elif provider.lower() == 'neural':
return NeuralOCRService()
else:
raise ValueError(f"Unsupported OCR provider: {provider}")
@staticmethod
def get_available_providers() -> List[str]:
"""Get list of available OCR providers.
Returns:
List of available provider names
"""
return ['nanonets', 'neural']
```
## /docstrange/processors/__init__.py
```py path="/docstrange/processors/__init__.py"
"""Processors for different file types."""
from .pdf_processor import PDFProcessor
from .docx_processor import DOCXProcessor
from .txt_processor import TXTProcessor
from .excel_processor import ExcelProcessor
from .url_processor import URLProcessor
from .html_processor import HTMLProcessor
from .pptx_processor import PPTXProcessor
from .image_processor import ImageProcessor
from .cloud_processor import CloudProcessor, CloudConversionResult
from .gpu_processor import GPUProcessor, GPUConversionResult
__all__ = [
"PDFProcessor",
"DOCXProcessor",
"TXTProcessor",
"ExcelProcessor",
"URLProcessor",
"HTMLProcessor",
"PPTXProcessor",
"ImageProcessor",
"CloudProcessor",
"CloudConversionResult",
"GPUProcessor",
"GPUConversionResult"
]
```
## /docstrange/processors/base.py
```py path="/docstrange/processors/base.py"
"""Base processor class for document conversion."""
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional
from ..result import ConversionResult
from docstrange.config import InternalConfig
import os
import stat
class BaseProcessor(ABC):
"""Base class for all document processors."""
def __init__(self, preserve_layout: bool = True, include_images: bool = False, ocr_enabled: bool = True, use_markdownify: bool = InternalConfig.use_markdownify):
"""Initialize the processor.
Args:
preserve_layout: Whether to preserve document layout
include_images: Whether to include images in output
ocr_enabled: Whether to enable OCR for image processing
use_markdownify: Whether to use markdownify for HTML->Markdown conversion
"""
self.preserve_layout = preserve_layout
self.include_images = include_images
self.ocr_enabled = ocr_enabled
self.use_markdownify = use_markdownify
@abstractmethod
def can_process(self, file_path: str) -> bool:
"""Check if this processor can handle the given file.
Args:
file_path: Path to the file to check
Returns:
True if this processor can handle the file
"""
pass
@abstractmethod
def process(self, file_path: str) -> ConversionResult:
"""Process the file and return a conversion result.
Args:
file_path: Path to the file to process
Returns:
ConversionResult containing the processed content
Raises:
ConversionError: If processing fails
"""
pass
def get_metadata(self, file_path: str) -> Dict[str, Any]:
"""Get metadata about the file.
Args:
file_path: Path to the file
Returns:
Dictionary containing file metadata
"""
try:
stat = os.stat(file_path)
# Ensure file_path is a string for splitext
file_path_str = str(file_path)
return {
"file_size": stat.st_size,
"file_extension": os.path.splitext(file_path_str)[1].lower(),
"file_name": os.path.basename(file_path_str),
"processor": self.__class__.__name__,
"preserve_layout": self.preserve_layout,
"include_images": self.include_images,
"ocr_enabled": self.ocr_enabled
}
except Exception as e:
logger.warning(f"Failed to get metadata for {file_path}: {e}")
return {
"processor": self.__class__.__name__,
"preserve_layout": self.preserve_layout,
"include_images": self.include_images,
"ocr_enabled": self.ocr_enabled
}
```
## /docstrange/processors/cloud_processor.py
```py path="/docstrange/processors/cloud_processor.py"
"""Cloud processor for Nanonets API integration."""
import os
import requests
import json
import logging
from typing import Dict, Any, Optional
from .base import BaseProcessor
from ..result import ConversionResult
from ..exceptions import ConversionError
logger = logging.getLogger(__name__)
class CloudConversionResult(ConversionResult):
"""Enhanced ConversionResult for cloud mode with lazy API calls."""
def __init__(self, file_path: str, cloud_processor: 'CloudProcessor', metadata: Optional[Dict[str, Any]] = None):
# Initialize with empty content - we'll make API calls on demand
super().__init__("", metadata)
self.file_path = file_path
self.cloud_processor = cloud_processor
self._cached_outputs = {} # Cache API responses by output type
def _get_cloud_output(self, output_type: str, specified_fields: Optional[list] = None, json_schema: Optional[dict] = None) -> str:
"""Get output from cloud API for specific type, with caching."""
# Validate output type
valid_output_types = ["markdown", "flat-json", "html", "csv", "specified-fields", "specified-json"]
if output_type not in valid_output_types:
logger.warning(f"Invalid output type '{output_type}' for cloud API. Using 'markdown'.")
output_type = "markdown"
# Create cache key based on output type and parameters
cache_key = output_type
if specified_fields:
cache_key += f"_fields_{','.join(specified_fields)}"
if json_schema:
cache_key += f"_schema_{hash(str(json_schema))}"
if cache_key in self._cached_outputs:
return self._cached_outputs[cache_key]
try:
# Prepare headers - API key is optional
headers = {}
if self.cloud_processor.api_key:
headers['Authorization'] = f'Bearer {self.cloud_processor.api_key}'
# Prepare file for upload
with open(self.file_path, 'rb') as file:
files = {
'file': (os.path.basename(self.file_path), file, self.cloud_processor._get_content_type(self.file_path))
}
data = {
'output_type': output_type
}
# Add model_type if specified
if self.cloud_processor.model_type:
data['model_type'] = self.cloud_processor.model_type
# Add field extraction parameters
if output_type == "specified-fields" and specified_fields:
data['specified_fields'] = ','.join(specified_fields)
elif output_type == "specified-json" and json_schema:
data['json_schema'] = json.dumps(json_schema)
# Log the request
if self.cloud_processor.api_key:
logger.info(f"Making cloud API call with authenticated access for {output_type} on {self.file_path}")
else:
logger.info(f"Making cloud API call without authentication (free tier) for {output_type} on {self.file_path}")
# Make API request
response = requests.post(
self.cloud_processor.api_url,
headers=headers,
files=files,
data=data,
timeout=300
)
# Handle rate limiting (429) specifically
if response.status_code == 429:
if not self.cloud_processor.api_key:
error_msg = (
"Rate limit exceeded for free tier (limited calls daily). "
"Run 'docstrange login' for 10,000 docs/month, or use an API key from https://app.nanonets.com/#/keys.\n"
"Examples:\n"
" - CLI: docstrange login\n"
" - Python: DocumentExtractor() # after login (uses cached credentials)\n"
" - Python: DocumentExtractor(api_key='YOUR_API_KEY') # alternative"
)
logger.error(error_msg)
raise ConversionError(error_msg)
else:
error_msg = "Rate limit exceeded (10k/month). Please try again later."
logger.error(error_msg)
raise ConversionError(error_msg)
response.raise_for_status()
result_data = response.json()
# Extract content from response
content = self.cloud_processor._extract_content_from_response(result_data)
# Cache the result
self._cached_outputs[cache_key] = content
return content
except ConversionError:
# Re-raise ConversionError (like rate limiting) without fallback
raise
except Exception as e:
logger.error(f"Failed to get {output_type} from cloud API: {e}")
# Try fallback to local conversion for other errors
return self._convert_locally(output_type)
def _convert_locally(self, output_type: str) -> str:
"""Fallback to local conversion methods."""
if output_type == "html":
return super().extract_html()
elif output_type == "flat-json":
return json.dumps(super().extract_data(), indent=2)
elif output_type == "csv":
return super().extract_csv(include_all_tables=True)
else:
return self.content
def extract_markdown(self) -> str:
"""Export as markdown."""
return self._get_cloud_output("markdown")
def extract_html(self) -> str:
"""Export as HTML."""
return self._get_cloud_output("html")
def extract_data(self, specified_fields: Optional[list] = None, json_schema: Optional[dict] = None) -> Dict[str, Any]:
"""Export as structured JSON with optional field extraction.
Args:
specified_fields: Optional list of specific fields to extract
json_schema: Optional JSON schema defining fields and types to extract
Returns:
Structured JSON with extracted data
"""
try:
if specified_fields:
# Request specified fields extraction
content = self._get_cloud_output("specified-fields", specified_fields=specified_fields)
extracted_data = json.loads(content)
return {
"extracted_fields": extracted_data,
"format": "specified_fields"
}
elif json_schema:
# Request JSON schema extraction
content = self._get_cloud_output("specified-json", json_schema=json_schema)
extracted_data = json.loads(content)
return {
"structured_data": extracted_data,
"format": "structured_json"
}
else:
# Standard JSON extraction
json_content = self._get_cloud_output("flat-json")
parsed_content = json.loads(json_content)
return {
"document": parsed_content,
"format": "cloud_flat_json"
}
except Exception as e:
logger.error(f"Failed to parse JSON content: {e}")
return {
"document": {"raw_content": content if 'content' in locals() else ""},
"format": "json_parse_error",
"error": str(e)
}
def extract_text(self) -> str:
"""Export as plain text."""
# For text output, we can try markdown first and then extract to text
try:
return self._get_cloud_output("markdown")
except Exception as e:
logger.error(f"Failed to get text output: {e}")
return ""
def extract_csv(self, table_index: int = 0, include_all_tables: bool = False) -> str:
"""Export tables as CSV format.
Args:
table_index: Which table to export (0-based index). Default is 0 (first table).
include_all_tables: If True, export all tables with separators. Default is False.
Returns:
CSV formatted string of the table(s)
Raises:
ValueError: If no tables are found or table_index is out of range
"""
return self._get_cloud_output("csv")
class CloudProcessor(BaseProcessor):
"""Processor for cloud-based document conversion using Nanonets API."""
def __init__(self, api_key: Optional[str] = None, output_type: str = None, model_type: Optional[str] = None,
specified_fields: Optional[list] = None, json_schema: Optional[dict] = None, **kwargs):
"""Initialize the cloud processor.
Args:
api_key: API key for cloud processing (optional - uses rate-limited free tier without key)
output_type: Output type for cloud processing (markdown, flat-json, html, csv, specified-fields, specified-json)
model_type: Model type for cloud processing (gemini, openapi, nanonets)
specified_fields: List of fields to extract (for specified-fields output type)
json_schema: JSON schema defining fields and types to extract (for specified-json output type)
"""
super().__init__(**kwargs)
self.api_key = api_key
self.output_type = output_type
self.model_type = model_type
self.specified_fields = specified_fields
self.json_schema = json_schema
self.api_url = "https://extraction-api.nanonets.com/extract"
# Don't validate output_type during initialization - it will be validated during processing
# This prevents warnings during DocumentExtractor initialization
def can_process(self, file_path: str) -> bool:
"""Check if the processor can handle the file."""
# Cloud processor supports most common document formats
# API key is optional - without it, uses rate-limited free tier
supported_extensions = {
'.pdf', '.docx', '.doc', '.xlsx', '.xls', '.pptx', '.ppt',
'.txt', '.html', '.htm', '.png', '.jpg', '.jpeg', '.gif',
'.bmp', '.tiff', '.tif'
}
_, ext = os.path.splitext(file_path.lower())
return ext in supported_extensions
def process(self, file_path: str) -> CloudConversionResult:
"""Create a lazy CloudConversionResult that will make API calls on demand.
Args:
file_path: Path to the file to process
Returns:
CloudConversionResult that makes API calls when output methods are called
Raises:
ConversionError: If file doesn't exist
"""
if not os.path.exists(file_path):
raise ConversionError(f"File not found: {file_path}")
# Create metadata without making any API calls
metadata = {
'source_file': file_path,
'processing_mode': 'cloud',
'api_provider': 'nanonets',
'file_size': os.path.getsize(file_path),
'model_type': self.model_type,
'has_api_key': bool(self.api_key)
}
if self.api_key:
logger.info(f"Created cloud extractor for {file_path} with freeAPI key - increased limits")
else:
logger.info(f"Created cloud extractor for {file_path} without API key - rate-limited access")
# Return lazy result that will make API calls when needed
return CloudConversionResult(
file_path=file_path,
cloud_processor=self,
metadata=metadata
)
def _extract_content_from_response(self, response_data: Dict[str, Any]) -> str:
"""Extract content from API response."""
try:
# API always returns content in the 'content' field
if 'content' in response_data:
return response_data['content']
# Fallback: return whole response as JSON if no content field
logger.warning("No 'content' field found in API response, returning full response")
return json.dumps(response_data, indent=2)
except Exception as e:
logger.error(f"Failed to extract content from API response: {e}")
return json.dumps(response_data, indent=2)
def _get_content_type(self, file_path: str) -> str:
"""Get content type for file upload."""
_, ext = os.path.splitext(file_path.lower())
content_types = {
'.pdf': 'application/pdf',
'.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
'.doc': 'application/msword',
'.xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
'.xls': 'application/vnd.ms-excel',
'.pptx': 'application/vnd.openxmlformats-officedocument.presentationml.presentation',
'.ppt': 'application/vnd.ms-powerpoint',
'.txt': 'text/plain',
'.html': 'text/html',
'.htm': 'text/html',
'.png': 'image/png',
'.jpg': 'image/jpeg',
'.jpeg': 'image/jpeg',
'.gif': 'image/gif',
'.bmp': 'image/bmp',
'.tiff': 'image/tiff',
'.tif': 'image/tiff'
}
return content_types.get(ext, 'application/octet-stream')
```
## /docstrange/processors/docx_processor.py
```py path="/docstrange/processors/docx_processor.py"
"""DOCX file processor."""
import os
from typing import Dict, Any
from .base import BaseProcessor
from ..result import ConversionResult
from ..exceptions import ConversionError, FileNotFoundError
class DOCXProcessor(BaseProcessor):
"""Processor for Microsoft Word DOCX and DOC files."""
def can_process(self, file_path: str) -> bool:
"""Check if this processor can handle the given file.
Args:
file_path: Path to the file to check
Returns:
True if this processor can handle the file
"""
if not os.path.exists(file_path):
return False
# Check file extension - ensure file_path is a string
file_path_str = str(file_path)
_, ext = os.path.splitext(file_path_str.lower())
return ext in ['.docx', '.doc']
def process(self, file_path: str) -> ConversionResult:
"""Process the DOCX file and return a conversion result.
Args:
file_path: Path to the DOCX file to process
Returns:
ConversionResult containing the processed content
Raises:
FileNotFoundError: If the file doesn't exist
ConversionError: If processing fails
"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
# Initialize metadata
metadata = {
"file_path": file_path,
"file_size": os.path.getsize(file_path),
"processor": "DOCXProcessor"
}
# Check file extension - ensure file_path is a string
file_path_str = str(file_path)
_, ext = os.path.splitext(file_path_str.lower())
if ext == '.doc':
return self._process_doc_file(file_path, metadata)
else:
return self._process_docx_file(file_path, metadata)
def _process_doc_file(self, file_path: str, metadata: Dict[str, Any]) -> ConversionResult:
"""Process .doc files using pypandoc."""
try:
import pypandoc
# Convert .doc to markdown using pandoc
content = pypandoc.convert_file(file_path, 'markdown')
metadata.update({
"file_type": "doc",
"extractor": "pypandoc"
})
# Clean up the content
content = self._clean_content(content)
return ConversionResult(content, metadata)
except ImportError:
raise ConversionError("pypandoc is required for .doc file processing. Install it with: pip install pypandoc")
except Exception as e:
raise ConversionError(f"Failed to process .doc file {file_path}: {str(e)}")
def _process_docx_file(self, file_path: str, metadata: Dict[str, Any]) -> ConversionResult:
"""Process .docx files using python-docx with improved table extraction."""
try:
from docx import Document
content_parts = []
doc = Document(file_path)
metadata.update({
"paragraph_count": len(doc.paragraphs),
"section_count": len(doc.sections),
"file_type": "docx",
"extractor": "python-docx"
})
# Extract text from paragraphs
for paragraph in doc.paragraphs:
if paragraph.text.strip():
# Check if this is a heading
if paragraph.style.name.startswith('Heading'):
level = paragraph.style.name.replace('Heading ', '')
try:
level_num = int(level)
content_parts.append(f"\n{'#' * min(level_num, 6)} {paragraph.text}\n")
except ValueError:
content_parts.append(f"\n## {paragraph.text}\n")
else:
content_parts.append(paragraph.text)
# Extract text from tables (improved)
for table_idx, table in enumerate(doc.tables):
# Check if preserve_layout is available (from base class or config)
preserve_layout = getattr(self, 'preserve_layout', False)
if preserve_layout:
content_parts.append(f"\n### Table {table_idx+1}\n")
# Gather all rows
rows = table.rows
if not rows:
continue
# Detect merged cells (optional warning)
merged_warning = False
for row in rows:
for cell in row.cells:
if len(cell._tc.xpath('.//w:vMerge')) > 0 or len(cell._tc.xpath('.//w:gridSpan')) > 0:
merged_warning = True
break
if merged_warning:
break
if merged_warning:
content_parts.append("*Warning: Table contains merged cells which may not render correctly in markdown.*\n")
# Row limit for large tables
row_limit = 20
if len(rows) > row_limit:
content_parts.append(f"*Table truncated to first {row_limit} rows out of {len(rows)} total.*\n")
# Build table data
table_data = []
for i, row in enumerate(rows):
if i >= row_limit:
break
row_data = [cell.text.strip().replace('\n', ' ') for cell in row.cells]
table_data.append(row_data)
# Ensure all rows have the same number of columns
max_cols = max(len(r) for r in table_data)
for r in table_data:
while len(r) < max_cols:
r.append("")
# Markdown table: first row as header
if table_data:
header = table_data[0]
separator = ["---"] * len(header)
content_parts.append("| " + " | ".join(header) + " |")
content_parts.append("| " + " | ".join(separator) + " |")
for row in table_data[1:]:
content_parts.append("| " + " | ".join(row) + " |")
content_parts.append("")
content = '\n'.join(content_parts)
content = self._clean_content(content)
return ConversionResult(content, metadata)
except ImportError:
raise ConversionError("python-docx is required for .docx file processing. Install it with: pip install python-docx")
except Exception as e:
raise ConversionError(f"Failed to process .docx file {file_path}: {str(e)}")
def _clean_content(self, content: str) -> str:
"""Clean up the extracted Word content.
Args:
content: Raw Word text content
Returns:
Cleaned text content
"""
# Remove excessive whitespace and normalize
lines = content.split('\n')
cleaned_lines = []
for line in lines:
# Remove excessive whitespace
line = ' '.join(line.split())
if line.strip():
cleaned_lines.append(line)
# Join lines and add proper spacing
content = '\n'.join(cleaned_lines)
# Add spacing around headers
content = content.replace('## ', '\n## ')
content = content.replace('### ', '\n### ')
return content.strip()
```
## /docstrange/processors/excel_processor.py
```py path="/docstrange/processors/excel_processor.py"
"""Excel file processor."""
import os
import logging
from typing import Dict, Any
from .base import BaseProcessor
from ..result import ConversionResult
from ..exceptions import ConversionError, FileNotFoundError
# Configure logging
logger = logging.getLogger(__name__)
class ExcelProcessor(BaseProcessor):
"""Processor for Excel files (XLSX, XLS) and CSV files."""
def can_process(self, file_path: str) -> bool:
"""Check if this processor can handle the given file.
Args:
file_path: Path to the file to check
Returns:
True if this processor can handle the file
"""
if not os.path.exists(file_path):
return False
# Check file extension - ensure file_path is a string
file_path_str = str(file_path)
_, ext = os.path.splitext(file_path_str.lower())
return ext in ['.xlsx', '.xls', '.csv']
def process(self, file_path: str) -> ConversionResult:
"""Process the Excel file and return a conversion result.
Args:
file_path: Path to the Excel file to process
Returns:
ConversionResult containing the processed content
Raises:
FileNotFoundError: If the file doesn't exist
ConversionError: If processing fails
"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
# Check file extension - ensure file_path is a string
file_path_str = str(file_path)
_, ext = os.path.splitext(file_path_str.lower())
if ext == '.csv':
return self._process_csv(file_path)
else:
return self._process_excel(file_path)
def _process_csv(self, file_path: str) -> ConversionResult:
"""Process a CSV file and return a conversion result.
Args:
file_path: Path to the CSV file to process
Returns:
ConversionResult containing the processed content
"""
try:
import pandas as pd
df = pd.read_csv(file_path)
content_parts = []
content_parts.append(f"# CSV Data: {os.path.basename(file_path)}")
content_parts.append("")
# Convert DataFrame to markdown table
table_md = self._dataframe_to_markdown(df, pd)
content_parts.append(table_md)
metadata = {
"row_count": len(df),
"column_count": len(df.columns),
"columns": df.columns.tolist(),
"extractor": "pandas"
}
content = '\n'.join(content_parts)
return ConversionResult(content, metadata)
except ImportError:
raise ConversionError("pandas is required for CSV processing. Install it with: pip install pandas")
except Exception as e:
raise ConversionError(f"Failed to process CSV file {file_path}: {str(e)}")
def _process_excel(self, file_path: str) -> ConversionResult:
"""Process an Excel file and return a conversion result.
Args:
file_path: Path to the Excel file to process
Returns:
ConversionResult containing the processed content
"""
try:
import pandas as pd
excel_file = pd.ExcelFile(file_path)
sheet_names = excel_file.sheet_names
metadata = {
"sheet_count": len(sheet_names),
"sheet_names": sheet_names,
"extractor": "pandas"
}
content_parts = []
for sheet_name in sheet_names:
df = pd.read_excel(file_path, sheet_name=sheet_name)
if not df.empty:
content_parts.append(f"\n## Sheet: {sheet_name}")
content_parts.append("")
# Convert DataFrame to markdown table
table_md = self._dataframe_to_markdown(df, pd)
content_parts.append(table_md)
content_parts.append("")
# Add metadata for this sheet
metadata.update({
f"sheet_{sheet_name}_rows": len(df),
f"sheet_{sheet_name}_columns": len(df.columns),
f"sheet_{sheet_name}_columns_list": df.columns.tolist()
})
content = '\n'.join(content_parts)
return ConversionResult(content, metadata)
except ImportError:
raise ConversionError("pandas and openpyxl are required for Excel processing. Install them with: pip install pandas openpyxl")
except Exception as e:
if isinstance(e, (FileNotFoundError, ConversionError)):
raise
raise ConversionError(f"Failed to process Excel file {file_path}: {str(e)}")
def _dataframe_to_markdown(self, df, pd) -> str:
"""Convert pandas DataFrame to markdown table.
Args:
df: pandas DataFrame
pd: pandas module reference
Returns:
Markdown table string
"""
if df.empty:
return "*No data available*"
# Convert DataFrame to markdown table
markdown_parts = []
# Header
markdown_parts.append("| " + " | ".join(str(col) for col in df.columns) + " |")
markdown_parts.append("| " + " | ".join(["---"] * len(df.columns)) + " |")
# Data rows
for _, row in df.iterrows():
row_data = []
for cell in row:
if pd.isna(cell):
row_data.append("")
else:
row_data.append(str(cell))
markdown_parts.append("| " + " | ".join(row_data) + " |")
return "\n".join(markdown_parts)
def _clean_content(self, content: str) -> str:
"""Clean up the extracted Excel content.
Args:
content: Raw Excel text content
Returns:
Cleaned text content
"""
# Remove excessive whitespace and normalize
lines = content.split('\n')
cleaned_lines = []
for line in lines:
# Remove excessive whitespace
line = ' '.join(line.split())
if line.strip():
cleaned_lines.append(line)
# Join lines and add proper spacing
content = '\n'.join(cleaned_lines)
# Add spacing around headers
content = content.replace('# ', '\n# ')
content = content.replace('## ', '\n## ')
return content.strip()
```
## /docstrange/processors/gpu_processor.py
```py path="/docstrange/processors/gpu_processor.py"
"""GPU processor with OCR capabilities for images and PDFs."""
import os
import json
import logging
import tempfile
import re
from typing import Dict, Any, List, Optional
from pathlib import Path
from .base import BaseProcessor
from ..result import ConversionResult
from ..exceptions import ConversionError, FileNotFoundError
from ..pipeline.ocr_service import OCRServiceFactory
# Configure logging
logger = logging.getLogger(__name__)
class GPUConversionResult(ConversionResult):
"""Enhanced ConversionResult for GPU processing with Nanonets OCR capabilities."""
def __init__(self, content: str, metadata: Optional[Dict[str, Any]] = None,
gpu_processor: Optional['GPUProcessor'] = None, file_path: Optional[str] = None,
ocr_provider: str = "nanonets"):
super().__init__(content, metadata)
self.gpu_processor = gpu_processor
self.file_path = file_path
self.ocr_provider = ocr_provider
# Add GPU-specific metadata
if metadata is None:
self.metadata = {}
# Ensure GPU-specific metadata is present
if 'processing_mode' not in self.metadata:
self.metadata['processing_mode'] = 'gpu'
if 'ocr_provider' not in self.metadata:
self.metadata['ocr_provider'] = ocr_provider
if 'gpu_processing' not in self.metadata:
self.metadata['gpu_processing'] = True
def get_ocr_info(self) -> Dict[str, Any]:
"""Get information about the OCR processing used.
Returns:
Dictionary with OCR processing information
"""
return {
'ocr_provider': self.ocr_provider,
'processing_mode': 'gpu',
'file_path': self.file_path,
'gpu_processor_available': self.gpu_processor is not None
}
def extract_markdown(self) -> str:
"""Export as markdown without GPU processing metadata."""
return self.content
def extract_html(self) -> str:
"""Export as HTML with GPU processing styling."""
# Get the base HTML from parent class
html_content = super().extract_html()
# Add GPU processing indicator
gpu_indicator = f"""
<div style="background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; padding: 1rem; border-radius: 8px; margin-bottom: 2rem; text-align: center;">
<strong>🚀 GPU Processed</strong> - Enhanced with {self.ocr_provider} OCR
</div>
"""
# Insert the indicator after the opening body tag
body_start = html_content.find('<body')
if body_start != -1:
body_end = html_content.find('>', body_start) + 1
return html_content[:body_end] + gpu_indicator + html_content[body_end:]
return html_content
def extract_data(self) -> Dict[str, Any]:
"""Export as structured JSON using Nanonets model with specific prompt."""
print("=== GPUConversionResult.extract_data() called ===")
print(f"gpu_processor: {self.gpu_processor}")
print(f"file_path: {self.file_path}")
print(f"file_exists: {os.path.exists(self.file_path) if self.file_path else False}")
try:
# If we have a GPU processor and file path, use the model to extract JSON
if self.gpu_processor and self.file_path and os.path.exists(self.file_path):
logger.info("Using Nanonets model for JSON extraction")
return self._extract_json_with_model()
else:
logger.info("Using fallback JSON conversion")
# Fallback to base JSON conversion
return self._convert_to_base_json()
except Exception as e:
logger.warning(f"Failed to extract JSON with model: {e}. Using fallback conversion.")
return self._convert_to_base_json()
def _extract_json_with_model(self) -> Dict[str, Any]:
"""Extract structured JSON using Nanonets model with specific prompt."""
try:
from PIL import Image
from transformers import AutoTokenizer, AutoProcessor, AutoModelForImageTextToText
# Get the model from the GPU processor's OCR service
ocr_service = self.gpu_processor._get_ocr_service()
# Access the model components from the OCR service
if hasattr(ocr_service, 'processor') and hasattr(ocr_service, 'model') and hasattr(ocr_service, 'tokenizer'):
model = ocr_service.model
processor = ocr_service.processor
tokenizer = ocr_service.tokenizer
else:
# Fallback: load model directly
model_path = "nanonets/Nanonets-OCR-s"
model = AutoModelForImageTextToText.from_pretrained(
model_path,
torch_dtype="auto",
device_map="auto"
)
model.eval()
processor = AutoProcessor.from_pretrained(model_path)
tokenizer = AutoTokenizer.from_pretrained(model_path)
# Define the JSON extraction prompt
prompt = """Extract all information from the above document and return it as a valid JSON object.
Instructions:
- The output should be a single JSON object.
- Keys should be meaningful field names.
- If multiple similar blocks (like invoice items or line items), return a list of JSON objects under a key.
- Use strings for all values.
- Wrap page numbers using: "page_number": "1"
- Wrap watermarks using: "watermark": "CONFIDENTIAL"
- Use ☐ and ☑ for checkboxes.
Example:
{
"Name": "John Doe",
"Invoice Number": "INV-4567",
"Amount Due": "$123.45",
"Items": [
{"Description": "Widget A", "Price": "$20"},
{"Description": "Widget B", "Price": "$30"}
],
"page_number": "1",
"watermark": "CONFIDENTIAL"
}"""
# Load the image
image = Image.open(self.file_path)
# Prepare messages for the model
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": [
{"type": "image", "image": f"file://{self.file_path}"},
{"type": "text", "text": prompt},
]},
]
# Apply chat template and process
text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
inputs = processor(text=[text], images=[image], padding=True, return_tensors="pt")
inputs = inputs.to(model.device)
# Generate JSON response
output_ids = model.generate(**inputs, max_new_tokens=15000, do_sample=False)
generated_ids = [output_ids[len(input_ids):] for input_ids, output_ids in zip(inputs.input_ids, output_ids)]
json_text = processor.batch_decode(generated_ids, skip_special_tokens=True, clean_up_tokenization_spaces=True)[0]
print(f"json_text: {json_text}")
# Try to parse the JSON response with improved parsing
def try_parse_json(text):
try:
return json.loads(text)
except json.JSONDecodeError:
# Try cleaning and reparsing
try:
text = re.sub(r"(\w+):", r'"\1":', text) # wrap keys
text = text.replace("'", '"') # replace single quotes
return json.loads(text)
except:
return {"raw_text": text}
# Parse the JSON
extracted_data = try_parse_json(json_text)
# Create the result structure
result = {
"document": extracted_data,
"format": "gpu_structured_json",
"gpu_processing_info": {
'ocr_provider': self.ocr_provider,
'processing_mode': 'gpu',
'file_path': self.file_path,
'gpu_processor_available': self.gpu_processor is not None,
'json_extraction_method': 'nanonets_model'
}
}
return result
except Exception as e:
logger.error(f"Failed to extract JSON with model: {e}")
raise
def _convert_to_base_json(self) -> Dict[str, Any]:
"""Fallback to base JSON conversion method."""
# Get the base JSON from parent class
base_json = super().extract_data()
# Add GPU-specific metadata
base_json['gpu_processing_info'] = {
'ocr_provider': self.ocr_provider,
'processing_mode': 'gpu',
'file_path': self.file_path,
'gpu_processor_available': self.gpu_processor is not None,
'json_extraction_method': 'fallback_conversion'
}
# Update the format to indicate GPU processing
base_json['format'] = 'gpu_structured_json'
return base_json
def extract_text(self) -> str:
"""Export as plain text without GPU processing header."""
return self.content
def get_processing_stats(self) -> Dict[str, Any]:
"""Get processing statistics and information.
Returns:
Dictionary with processing statistics
"""
stats = {
'processing_mode': 'gpu',
'ocr_provider': self.ocr_provider,
'file_path': self.file_path,
'content_length': len(self.content),
'word_count': len(self.content.split()),
'line_count': len(self.content.split('\n')),
'gpu_processor_available': self.gpu_processor is not None
}
# Add metadata if available
if self.metadata:
stats['metadata'] = self.metadata
return stats
class GPUProcessor(BaseProcessor):
"""Processor for image files and PDFs with Nanonets OCR capabilities."""
def __init__(self, preserve_layout: bool = True, include_images: bool = False, ocr_enabled: bool = True, use_markdownify: bool = None, ocr_service=None):
super().__init__(preserve_layout, include_images, ocr_enabled, use_markdownify)
self._ocr_service = ocr_service
def can_process(self, file_path: str) -> bool:
"""Check if this processor can handle the given file.
Args:
file_path: Path to the file to check
Returns:
True if this processor can handle the file
"""
if not os.path.exists(file_path):
return False
# Check file extension - ensure file_path is a string
file_path_str = str(file_path)
_, ext = os.path.splitext(file_path_str.lower())
return ext in ['.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.webp', '.gif', '.pdf']
def _get_ocr_service(self):
"""Get OCR service instance."""
if self._ocr_service is not None:
return self._ocr_service
# Use Nanonets OCR service by default
self._ocr_service = OCRServiceFactory.create_service('nanonets')
return self._ocr_service
def process(self, file_path: str) -> GPUConversionResult:
"""Process image file or PDF with OCR capabilities.
Args:
file_path: Path to the image file or PDF
Returns:
GPUConversionResult with extracted content
"""
try:
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
# Check file type
file_path_str = str(file_path)
_, ext = os.path.splitext(file_path_str.lower())
if ext == '.pdf':
logger.info(f"Processing PDF file: {file_path}")
return self._process_pdf(file_path)
else:
logger.info(f"Processing image file: {file_path}")
return self._process_image(file_path)
except Exception as e:
logger.error(f"Failed to process file {file_path}: {e}")
raise ConversionError(f"GPU processing failed: {e}")
def _process_image(self, file_path: str) -> GPUConversionResult:
"""Process image file with OCR capabilities.
Args:
file_path: Path to the image file
Returns:
GPUConversionResult with extracted content
"""
# Get OCR service
ocr_service = self._get_ocr_service()
# Extract text with layout awareness if enabled
if self.ocr_enabled and self.preserve_layout:
logger.info("Extracting text with layout awareness using Nanonets OCR")
extracted_text = ocr_service.extract_text_with_layout(file_path)
elif self.ocr_enabled:
logger.info("Extracting text without layout awareness using Nanonets OCR")
extracted_text = ocr_service.extract_text(file_path)
else:
logger.warning("OCR is disabled, returning empty content")
extracted_text = ""
# Create GPU result
result = GPUConversionResult(
content=extracted_text,
metadata={
'file_path': file_path,
'file_type': 'image',
'ocr_enabled': self.ocr_enabled,
'preserve_layout': self.preserve_layout,
'ocr_provider': 'nanonets'
},
gpu_processor=self,
file_path=file_path,
ocr_provider='nanonets'
)
logger.info(f"Image processing completed. Extracted {len(extracted_text)} characters")
return result
def _process_pdf(self, file_path: str) -> GPUConversionResult:
"""Process PDF file by converting to images and using OCR.
Args:
file_path: Path to the PDF file
Returns:
GPUConversionResult with extracted content
"""
try:
# Convert PDF to images
image_paths = self._convert_pdf_to_images(file_path)
if not image_paths:
logger.warning("No pages could be extracted from PDF")
return GPUConversionResult(
content="",
metadata={
'file_path': file_path,
'file_type': 'pdf',
'ocr_enabled': self.ocr_enabled,
'preserve_layout': self.preserve_layout,
'ocr_provider': 'nanonets',
'pages_processed': 0
},
gpu_processor=self,
file_path=file_path,
ocr_provider='nanonets'
)
# Process each page with OCR
all_texts = []
ocr_service = self._get_ocr_service()
for i, image_path in enumerate(image_paths):
logger.info(f"Processing PDF page {i+1}/{len(image_paths)}")
try:
if self.ocr_enabled and self.preserve_layout:
page_text = ocr_service.extract_text_with_layout(image_path)
elif self.ocr_enabled:
page_text = ocr_service.extract_text(image_path)
else:
page_text = ""
# Add page header and content if there's text
if page_text.strip():
# Add page header (markdown style)
all_texts.append(f"\n## Page {i+1}\n\n")
all_texts.append(page_text)
# Add horizontal rule after content (except for last page)
if i < len(image_paths) - 1:
all_texts.append("\n\n---\n\n")
except Exception as e:
logger.error(f"Failed to process page {i+1}: {e}")
# Add error page with markdown formatting
all_texts.append(f"\n## Page {i+1}\n\n*Error processing this page: {e}*\n\n")
if i < len(image_paths) - 1:
all_texts.append("---\n\n")
finally:
# Clean up temporary image file
try:
os.unlink(image_path)
except:
pass
# Combine all page texts
combined_text = ''.join(all_texts)
# Create result
result = GPUConversionResult(
content=combined_text,
metadata={
'file_path': file_path,
'file_type': 'pdf',
'ocr_enabled': self.ocr_enabled,
'preserve_layout': self.preserve_layout,
'ocr_provider': 'nanonets',
'pages_processed': len(image_paths)
},
gpu_processor=self,
file_path=file_path,
ocr_provider='nanonets'
)
logger.info(f"PDF processing completed. Processed {len(image_paths)} pages, extracted {len(combined_text)} characters")
return result
except Exception as e:
logger.error(f"Failed to process PDF {file_path}: {e}")
raise ConversionError(f"PDF processing failed: {e}")
def _convert_pdf_to_images(self, pdf_path: str) -> List[str]:
"""Convert PDF pages to images.
Args:
pdf_path: Path to the PDF file
Returns:
List of paths to temporary image files
"""
try:
from pdf2image import convert_from_path
from ..config import InternalConfig
# Get DPI from config
dpi = getattr(InternalConfig, 'pdf_image_dpi', 300)
# Convert PDF pages to images using pdf2image
images = convert_from_path(pdf_path, dpi=dpi)
image_paths = []
# Save each image to a temporary file
for page_num, image in enumerate(images):
persistent_image_path = tempfile.mktemp(suffix='.png')
image.save(persistent_image_path, 'PNG')
image_paths.append(persistent_image_path)
logger.info(f"Converted PDF to {len(image_paths)} images")
return image_paths
except ImportError:
logger.error("pdf2image not available. Please install it: pip install pdf2image")
raise ConversionError("pdf2image is required for PDF processing")
except Exception as e:
logger.error(f"Failed to extract PDF to images: {e}")
raise ConversionError(f"PDF to image conversion failed: {e}")
@staticmethod
def predownload_ocr_models():
"""Pre-download OCR models by running a dummy prediction."""
try:
from docstrange.pipeline.ocr_service import OCRServiceFactory
ocr_service = OCRServiceFactory.create_service('nanonets')
# Create a blank image for testing
from PIL import Image
import tempfile
with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as tmp:
img = Image.new('RGB', (100, 100), color='white')
img.save(tmp.name)
ocr_service.extract_text_with_layout(tmp.name)
os.unlink(tmp.name)
print("Nanonets OCR models pre-downloaded and cached.")
except Exception as e:
print(f"Failed to pre-download Nanonets OCR models: {e}")
```
## /docstrange/processors/html_processor.py
```py path="/docstrange/processors/html_processor.py"
"""HTML file processor."""
import os
import logging
from typing import Dict, Any
from .base import BaseProcessor
from ..result import ConversionResult
from ..exceptions import ConversionError, FileNotFoundError
# Configure logging
logger = logging.getLogger(__name__)
class HTMLProcessor(BaseProcessor):
"""Processor for HTML files using markdownify for conversion."""
def can_process(self, file_path: str) -> bool:
"""Check if this processor can handle the given file.
Args:
file_path: Path to the file to check
Returns:
True if this processor can handle the file
"""
if not os.path.exists(file_path):
return False
# Check file extension - ensure file_path is a string
file_path_str = str(file_path)
_, ext = os.path.splitext(file_path_str.lower())
return ext in ['.html', '.htm']
def process(self, file_path: str) -> ConversionResult:
"""Process the HTML file and return a conversion result.
Args:
file_path: Path to the HTML file to process
Returns:
ConversionResult containing the processed content
Raises:
FileNotFoundError: If the file doesn't exist
ConversionError: If processing fails
"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
try:
try:
from markdownify import markdownify as md
except ImportError:
raise ConversionError("markdownify is required for HTML processing. Install it with: pip install markdownify")
metadata = self.get_metadata(file_path)
with open(file_path, 'r', encoding='utf-8') as f:
html_content = f.read()
content = md(html_content, heading_style="ATX")
return ConversionResult(content, metadata)
except Exception as e:
if isinstance(e, (FileNotFoundError, ConversionError)):
raise
raise ConversionError(f"Failed to process HTML file {file_path}: {str(e)}")
```
## /docstrange/processors/image_processor.py
```py path="/docstrange/processors/image_processor.py"
"""Image file processor with OCR capabilities."""
import os
import logging
from typing import Dict, Any
from .base import BaseProcessor
from ..result import ConversionResult
from ..exceptions import ConversionError, FileNotFoundError
from ..pipeline.ocr_service import OCRServiceFactory
# Configure logging
logger = logging.getLogger(__name__)
class ImageProcessor(BaseProcessor):
"""Processor for image files (JPG, PNG, etc.) with OCR capabilities."""
def __init__(self, preserve_layout: bool = True, include_images: bool = False, ocr_enabled: bool = True, use_markdownify: bool = None, ocr_service=None):
super().__init__(preserve_layout, include_images, ocr_enabled, use_markdownify)
self._ocr_service = ocr_service
def can_process(self, file_path: str) -> bool:
"""Check if this processor can handle the given file.
Args:
file_path: Path to the file to check
Returns:
True if this processor can handle the file
"""
if not os.path.exists(file_path):
return False
# Check file extension - ensure file_path is a string
file_path_str = str(file_path)
_, ext = os.path.splitext(file_path_str.lower())
return ext in ['.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.webp', '.gif']
def _get_ocr_service(self):
"""Get OCR service instance."""
if self._ocr_service is not None:
return self._ocr_service
self._ocr_service = OCRServiceFactory.create_service()
return self._ocr_service
def process(self, file_path: str) -> ConversionResult:
"""Process image file with OCR capabilities.
Args:
file_path: Path to the image file
Returns:
ConversionResult with extracted content
"""
try:
if not os.path.exists(file_path):
raise FileNotFoundError(f"Image file not found: {file_path}")
logger.info(f"Processing image file: {file_path}")
# Get OCR service
ocr_service = self._get_ocr_service()
# Extract text with layout awareness if enabled
if self.ocr_enabled and self.preserve_layout:
logger.info("Extracting text with layout awareness")
extracted_text = ocr_service.extract_text_with_layout(file_path)
elif self.ocr_enabled:
logger.info("Extracting text without layout awareness")
extracted_text = ocr_service.extract_text(file_path)
else:
logger.warning("OCR is disabled, returning empty content")
extracted_text = ""
# Create result
result = ConversionResult(
content=extracted_text,
metadata={
'file_path': file_path,
'file_type': 'image',
'ocr_enabled': self.ocr_enabled,
'preserve_layout': self.preserve_layout
}
)
logger.info(f"Image processing completed. Extracted {len(extracted_text)} characters")
return result
except Exception as e:
logger.error(f"Failed to process image file {file_path}: {e}")
raise ConversionError(f"Image processing failed: {e}")
@staticmethod
def predownload_ocr_models():
"""Pre-download OCR models by running a dummy prediction."""
try:
from docstrange.services.ocr_service import OCRServiceFactory
ocr_service = OCRServiceFactory.create_service()
# Create a blank image for testing
from PIL import Image
import tempfile
with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as tmp:
img = Image.new('RGB', (100, 100), color='white')
img.save(tmp.name)
ocr_service.extract_text_with_layout(tmp.name)
os.unlink(tmp.name)
print("OCR models pre-downloaded and cached.")
except Exception as e:
print(f"Failed to pre-download OCR models: {e}")
```
## /docstrange/processors/pdf_processor.py
```py path="/docstrange/processors/pdf_processor.py"
"""PDF file processor with OCR support for scanned PDFs."""
import os
import logging
import tempfile
from typing import Dict, Any, List, Tuple
from .base import BaseProcessor
from .image_processor import ImageProcessor
from ..result import ConversionResult
from ..exceptions import ConversionError, FileNotFoundError
from ..config import InternalConfig
from ..pipeline.ocr_service import OCRServiceFactory, NeuralOCRService
# Configure logging
logger = logging.getLogger(__name__)
class PDFProcessor(BaseProcessor):
"""Processor for PDF files using PDF-to-image conversion with OCR."""
def __init__(self, preserve_layout: bool = True, include_images: bool = False, ocr_enabled: bool = True, use_markdownify: bool = None):
super().__init__(preserve_layout, include_images, ocr_enabled, use_markdownify)
# Create a shared OCR service instance for all pages
shared_ocr_service = NeuralOCRService()
self._image_processor = ImageProcessor(
preserve_layout=preserve_layout,
include_images=include_images,
ocr_enabled=ocr_enabled,
use_markdownify=use_markdownify,
ocr_service=shared_ocr_service
)
def can_process(self, file_path: str) -> bool:
"""Check if this processor can handle the given file.
Args:
file_path: Path to the file to check
Returns:
True if this processor can handle the file
"""
if not os.path.exists(file_path):
return False
# Check file extension - ensure file_path is a string
file_path_str = str(file_path)
_, ext = os.path.splitext(file_path_str.lower())
return ext == '.pdf'
def process(self, file_path: str) -> ConversionResult:
"""Process PDF file with OCR capabilities.
Args:
file_path: Path to the PDF file
Returns:
ConversionResult with extracted content
"""
try:
from ..config import InternalConfig
pdf_to_image_enabled = InternalConfig.pdf_to_image_enabled
except (ImportError, AttributeError):
# Fallback if config is not available
pdf_to_image_enabled = True
logger.warning("InternalConfig not available, defaulting to pdf_to_image_enabled = True")
try:
if not os.path.exists(file_path):
raise FileNotFoundError(f"PDF file not found: {file_path}")
logger.info(f"Processing PDF file: {file_path}")
logger.info(f"pdf_to_image_enabled = {pdf_to_image_enabled}")
# Always use OCR-based processing (pdf2image + OCR)
logger.info("Using OCR-based PDF processing with pdf2image")
return self._process_with_ocr(file_path)
except Exception as e:
logger.error(f"Failed to process PDF file {file_path}: {e}")
raise ConversionError(f"PDF processing failed: {e}")
def _process_with_ocr(self, file_path: str) -> ConversionResult:
"""Process PDF using OCR after converting pages to images."""
try:
from pdf2image import convert_from_path
from ..config import InternalConfig
# Get DPI from config
dpi = getattr(InternalConfig, 'pdf_image_dpi', 300)
# Convert PDF pages to images using pdf2image
images = convert_from_path(file_path, dpi=dpi)
page_count = len(images)
all_content = []
for page_num, image in enumerate(images):
# Save to temporary file for OCR processing
with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as tmp:
image.save(tmp.name, 'PNG')
temp_image_path = tmp.name
try:
# Process the page image
page_result = self._image_processor.process(temp_image_path)
page_content = page_result.content
if page_content.strip():
all_content.append(f"## Page {page_num + 1}\n\n{page_content}")
finally:
# Clean up temporary file
os.unlink(temp_image_path)
content = "\n\n".join(all_content) if all_content else "No content extracted from PDF"
return ConversionResult(
content=content,
metadata={
'file_path': file_path,
'file_type': 'pdf',
'pages': page_count,
'extraction_method': 'ocr'
}
)
except ImportError:
logger.error("pdf2image not available. Please install it: pip install pdf2image")
raise ConversionError("pdf2image is required for PDF processing")
except Exception as e:
logger.error(f"OCR-based PDF processing failed: {e}")
raise ConversionError(f"OCR-based PDF processing failed: {e}")
def _convert_page_to_image(self, pdf_path: str, page_num: int) -> str:
"""Convert a PDF page to an image file.
Args:
pdf_path: Path to the PDF file
page_num: Page number (0-based)
Returns:
Path to the temporary image file
"""
try:
from pdf2image import convert_from_path
from ..config import InternalConfig
# Use configuration for image quality
dpi = getattr(InternalConfig, 'pdf_image_dpi', 300)
# Convert specific page to image
images = convert_from_path(pdf_path, dpi=dpi, first_page=page_num + 1, last_page=page_num + 1)
if not images:
logger.error(f"Failed to extract page {page_num + 1} to image")
return None
# Save to temporary file
with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as tmp_file:
images[0].save(tmp_file.name, 'PNG')
logger.debug(f"Page {page_num + 1} converted to image: {tmp_file.name}")
return tmp_file.name
except Exception as e:
logger.error(f"Failed to extract page {page_num + 1} to image: {e}")
return None
def _extract_ocr_text_from_result(self, result: ConversionResult) -> str:
"""Extract OCR text from ImageProcessor result.
Args:
result: ConversionResult from ImageProcessor
Returns:
Extracted OCR text
"""
try:
content = result.content
# Look for OCR section in the content
if "## Extracted Text (OCR)" in content:
# Extract text after the OCR header
parts = content.split("## Extracted Text (OCR)")
if len(parts) > 1:
ocr_section = parts[1]
# Remove any remaining headers and clean up
lines = ocr_section.split('\n')
text_lines = []
in_ocr_text = False
for line in lines:
if line.strip() == "":
continue
elif line.startswith("##"):
# Stop at next header
break
else:
text_lines.append(line)
return '\n'.join(text_lines).strip()
# If no OCR section found, return the full content
return content
except Exception as e:
logger.error(f"Failed to extract OCR text from result: {e}")
return ""
def _format_page_content(self, text: str, page_num: int) -> str:
"""Format page content as markdown with enhanced structure.
Args:
text: Extracted text
page_num: Page number
Returns:
Formatted markdown content
"""
if not text.strip():
return f"\n## Page {page_num}\n\n*This page appears to be empty or contains no extractable text.*\n"
# The text from nanonets-ocr already has proper markdown structure
# Just add page header
content_parts = [f"## Page {page_num}"]
content_parts.append("")
content_parts.append(text)
content_parts.append("")
return '\n'.join(content_parts)
@staticmethod
def predownload_ocr_models():
"""Pre-download OCR models by running a dummy prediction."""
try:
# Use ImageProcessor's predownload method
ImageProcessor.predownload_ocr_models()
except Exception as e:
print(f"Failed to pre-download OCR models: {e}")
```
## /docstrange/processors/pptx_processor.py
```py path="/docstrange/processors/pptx_processor.py"
"""PowerPoint file processor."""
import os
import logging
from typing import Dict, Any
from .base import BaseProcessor
from ..result import ConversionResult
from ..exceptions import ConversionError, FileNotFoundError
# Configure logging
logger = logging.getLogger(__name__)
class PPTXProcessor(BaseProcessor):
"""Processor for PowerPoint files (PPT, PPTX)."""
def can_process(self, file_path: str) -> bool:
"""Check if this processor can handle the given file.
Args:
file_path: Path to the file to check
Returns:
True if this processor can handle the file
"""
if not os.path.exists(file_path):
return False
# Check file extension - ensure file_path is a string
file_path_str = str(file_path)
_, ext = os.path.splitext(file_path_str.lower())
return ext in ['.ppt', '.pptx']
def process(self, file_path: str) -> ConversionResult:
"""Process the PowerPoint file and return a conversion result.
Args:
file_path: Path to the PowerPoint file to process
Returns:
ConversionResult containing the processed content
Raises:
FileNotFoundError: If the file doesn't exist
ConversionError: If processing fails
"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
# Initialize metadata
metadata = {
"file_path": file_path,
"file_size": os.path.getsize(file_path),
"processor": "PPTXProcessor"
}
# Check file extension to determine processing method
file_path_str = str(file_path)
_, ext = os.path.splitext(file_path_str.lower())
if ext == '.ppt':
return self._process_ppt_file(file_path, metadata)
else:
return self._process_pptx_file(file_path, metadata)
def _process_ppt_file(self, file_path: str, metadata: Dict[str, Any]) -> ConversionResult:
"""Process .ppt files using pypandoc."""
try:
import pypandoc
# Convert .ppt to markdown using pandoc
content = pypandoc.convert_file(file_path, 'markdown')
metadata.update({
"file_type": "ppt",
"extractor": "pypandoc"
})
# Clean up the content
content = self._clean_content(content)
return ConversionResult(content, metadata)
except ImportError:
raise ConversionError("pypandoc is required for .ppt file processing. Install it with: pip install pypandoc")
except Exception as e:
raise ConversionError(f"Failed to process .ppt file {file_path}: {str(e)}")
def _process_pptx_file(self, file_path: str, metadata: Dict[str, Any]) -> ConversionResult:
"""Process .pptx files using python-pptx."""
try:
from pptx import Presentation
content_parts = []
prs = Presentation(file_path)
metadata.update({
"slide_count": len(prs.slides),
"file_type": "pptx",
"extractor": "python-pptx"
})
# Check if preserve_layout is available (from base class or config)
preserve_layout = getattr(self, 'preserve_layout', False)
for slide_num, slide in enumerate(prs.slides, 1):
if preserve_layout:
content_parts.append(f"\n## Slide {slide_num}\n")
slide_content = []
for shape in slide.shapes:
if hasattr(shape, "text") and shape.text.strip():
slide_content.append(shape.text.strip())
if slide_content:
content_parts.extend(slide_content)
content_parts.append("") # Add spacing between slides
content = "\n\n".join(content_parts)
# Clean up the content
content = self._clean_content(content)
return ConversionResult(content, metadata)
except ImportError:
raise ConversionError("python-pptx is required for .pptx file processing. Install it with: pip install python-pptx")
except Exception as e:
if isinstance(e, (FileNotFoundError, ConversionError)):
raise
raise ConversionError(f"Failed to process .pptx file {file_path}: {str(e)}")
def _clean_content(self, content: str) -> str:
"""Clean up the extracted PowerPoint content.
Args:
content: Raw PowerPoint text content
Returns:
Cleaned text content
"""
# Remove excessive whitespace and normalize
lines = content.split('\n')
cleaned_lines = []
for line in lines:
# Remove excessive whitespace
line = ' '.join(line.split())
if line.strip():
cleaned_lines.append(line)
# Join lines and add proper spacing
content = '\n'.join(cleaned_lines)
# Add spacing around headers
content = content.replace('## Slide', '\n## Slide')
return content.strip()
```
## /docstrange/processors/txt_processor.py
```py path="/docstrange/processors/txt_processor.py"
"""Text file processor."""
import os
from typing import Dict, Any
from .base import BaseProcessor
from ..result import ConversionResult
from ..exceptions import ConversionError, FileNotFoundError
class TXTProcessor(BaseProcessor):
"""Processor for plain text files."""
def can_process(self, file_path: str) -> bool:
"""Check if this processor can handle the given file.
Args:
file_path: Path to the file to check
Returns:
True if this processor can handle the file
"""
if not os.path.exists(file_path):
return False
# Check file extension - ensure file_path is a string
file_path_str = str(file_path)
_, ext = os.path.splitext(file_path_str.lower())
return ext in ['.txt', '.text']
def process(self, file_path: str) -> ConversionResult:
"""Process the text file and return a conversion result.
Args:
file_path: Path to the text file to process
Returns:
ConversionResult containing the processed content
Raises:
FileNotFoundError: If the file doesn't exist
ConversionError: If processing fails
"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
try:
# Try different encodings
encodings = ['utf-8', 'latin-1', 'cp1252', 'iso-8859-1']
content = None
for encoding in encodings:
try:
with open(file_path, 'r', encoding=encoding) as f:
content = f.read()
break
except UnicodeDecodeError:
continue
if content is None:
raise ConversionError(f"Could not decode file {file_path} with any supported encoding")
# Clean up the content
content = self._clean_content(content)
metadata = self.get_metadata(file_path)
metadata.update({
"encoding": encoding,
"line_count": len(content.split('\n')),
"word_count": len(content.split())
})
return ConversionResult(content, metadata)
except Exception as e:
if isinstance(e, (FileNotFoundError, ConversionError)):
raise
raise ConversionError(f"Failed to process text file {file_path}: {str(e)}")
def _clean_content(self, content: str) -> str:
"""Clean up the text content.
Args:
content: Raw text content
Returns:
Cleaned text content
"""
# Remove excessive whitespace
lines = content.split('\n')
cleaned_lines = []
for line in lines:
# Remove trailing whitespace
line = line.rstrip()
cleaned_lines.append(line)
# Remove empty lines at the beginning and end
while cleaned_lines and not cleaned_lines[0].strip():
cleaned_lines.pop(0)
while cleaned_lines and not cleaned_lines[-1].strip():
cleaned_lines.pop()
return '\n'.join(cleaned_lines)
```
## /docstrange/processors/url_processor.py
```py path="/docstrange/processors/url_processor.py"
"""URL processor for handling web pages and file downloads."""
import os
import re
import tempfile
from typing import Dict, Any, Optional
from urllib.parse import urlparse
from .base import BaseProcessor
from ..result import ConversionResult
from ..exceptions import ConversionError, NetworkError
class URLProcessor(BaseProcessor):
"""Processor for URLs and web pages."""
def can_process(self, file_path: str) -> bool:
"""Check if this processor can handle the given file.
Args:
file_path: Path to the file to check (or URL)
Returns:
True if this processor can handle the file
"""
# Check if it looks like a URL
return self._is_url(file_path)
def process(self, file_path: str) -> ConversionResult:
"""Process the URL and return a conversion result.
Args:
file_path: URL to process
Returns:
ConversionResult containing the processed content
Raises:
NetworkError: If network operations fail
ConversionError: If processing fails
"""
try:
import requests
# First, check if this URL points to a file
file_info = self._detect_file_from_url(file_path)
if file_info:
# This is a file URL, download and process it
return self._process_file_url(file_path, file_info)
else:
# This is a web page, process it as HTML
return self._process_web_page(file_path)
except ImportError:
raise ConversionError("requests and beautifulsoup4 are required for URL processing. Install them with: pip install requests beautifulsoup4")
except requests.RequestException as e:
raise NetworkError(f"Failed to fetch URL {file_path}: {str(e)}")
except Exception as e:
if isinstance(e, (NetworkError, ConversionError)):
raise
raise ConversionError(f"Failed to process URL {file_path}: {str(e)}")
def _detect_file_from_url(self, url: str) -> Optional[Dict[str, Any]]:
"""Detect if a URL points to a file and return file information.
Args:
url: URL to check
Returns:
File info dict if it's a file URL, None otherwise
"""
try:
import requests
# Check URL path for file extensions
parsed_url = urlparse(url)
path = parsed_url.path.lower()
# Common file extensions
file_extensions = {
'.pdf': 'pdf',
'.doc': 'doc',
'.docx': 'docx',
'.txt': 'txt',
'.md': 'markdown',
'.html': 'html',
'.htm': 'html',
'.xlsx': 'xlsx',
'.xls': 'xls',
'.csv': 'csv',
'.ppt': 'ppt',
'.pptx': 'pptx',
'.jpg': 'image',
'.jpeg': 'image',
'.png': 'image',
'.gif': 'image',
'.bmp': 'image',
'.tiff': 'image',
'.tif': 'image',
'.webp': 'image'
}
# Check for file extension in URL path
for ext, file_type in file_extensions.items():
if path.endswith(ext):
return {
'file_type': file_type,
'extension': ext,
'filename': os.path.basename(path) or f"downloaded_file{ext}"
}
# If no extension in URL, check content-type header
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
# Make a HEAD request to check content-type
response = requests.head(url, headers=headers, timeout=10, allow_redirects=True)
if response.status_code == 200:
content_type = response.headers.get('content-type', '').lower()
# Check for file content types
if 'application/pdf' in content_type:
return {'file_type': 'pdf', 'extension': '.pdf', 'filename': 'downloaded_file.pdf'}
elif 'application/msword' in content_type or 'application/vnd.openxmlformats-officedocument.wordprocessingml.document' in content_type:
ext = '.docx' if 'openxmlformats' in content_type else '.doc'
return {'file_type': 'doc' if ext == '.doc' else 'docx', 'extension': ext, 'filename': f'downloaded_file{ext}'}
elif 'application/vnd.ms-excel' in content_type or 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' in content_type:
ext = '.xlsx' if 'openxmlformats' in content_type else '.xls'
return {'file_type': 'xlsx' if ext == '.xlsx' else 'xls', 'extension': ext, 'filename': f'downloaded_file{ext}'}
elif 'application/vnd.ms-powerpoint' in content_type or 'application/vnd.openxmlformats-officedocument.presentationml.presentation' in content_type:
ext = '.pptx' if 'openxmlformats' in content_type else '.ppt'
return {'file_type': 'pptx' if ext == '.pptx' else 'ppt', 'extension': ext, 'filename': f'downloaded_file{ext}'}
elif 'text/plain' in content_type:
return {'file_type': 'txt', 'extension': '.txt', 'filename': 'downloaded_file.txt'}
elif 'text/markdown' in content_type:
return {'file_type': 'markdown', 'extension': '.md', 'filename': 'downloaded_file.md'}
elif 'text/html' in content_type:
# HTML could be a web page or a file, check if it's likely a file
if 'attachment' in response.headers.get('content-disposition', '').lower():
return {'file_type': 'html', 'extension': '.html', 'filename': 'downloaded_file.html'}
# If it's HTML but not an attachment, treat as web page
return None
elif any(img_type in content_type for img_type in ['image/jpeg', 'image/png', 'image/gif', 'image/bmp', 'image/tiff', 'image/webp']):
# Determine extension from content type
ext_map = {
'image/jpeg': '.jpg',
'image/png': '.png',
'image/gif': '.gif',
'image/bmp': '.bmp',
'image/tiff': '.tiff',
'image/webp': '.webp'
}
ext = ext_map.get(content_type, '.jpg')
return {'file_type': 'image', 'extension': ext, 'filename': f'downloaded_file{ext}'}
except requests.RequestException:
# If HEAD request fails, assume it's a web page
pass
except Exception:
pass
return None
def _process_file_url(self, url: str, file_info: Dict[str, Any]) -> ConversionResult:
"""Download and process a file from URL.
Args:
url: URL to download from
file_info: Information about the file
Returns:
ConversionResult containing the processed content
"""
try:
import requests
from ..extractor import DocumentExtractor
# Download the file
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
response = requests.get(url, headers=headers, timeout=60, stream=True)
response.raise_for_status()
# Create a temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix=file_info['extension']) as temp_file:
# Write the downloaded content and track size
content_length = 0
for chunk in response.iter_content(chunk_size=8192):
if chunk: # Filter out keep-alive chunks
temp_file.write(chunk)
content_length += len(chunk)
temp_file_path = temp_file.name
try:
# Process the downloaded file using the appropriate processor
extractor = DocumentExtractor()
result = extractor.extract(temp_file_path)
# Add URL metadata to the result
result.metadata.update({
"source_url": url,
"downloaded_filename": file_info['filename'],
"content_type": response.headers.get('content-type', ''),
"content_length": content_length
})
return result
finally:
# Clean up the temporary file
try:
os.unlink(temp_file_path)
except OSError:
pass
except Exception as e:
raise ConversionError(f"Failed to download and process file from URL {url}: {str(e)}")
def _process_web_page(self, url: str) -> ConversionResult:
"""Process a web page URL.
Args:
url: URL to process
Returns:
ConversionResult containing the processed content
"""
try:
from bs4 import BeautifulSoup
import requests
# Fetch the web page
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
response = requests.get(url, headers=headers, timeout=30)
response.raise_for_status()
# Parse the HTML
soup = BeautifulSoup(response.content, 'html.parser')
# Remove script and style elements
for script in soup(["script", "style"]):
script.decompose()
# Extract text content
content_parts = []
# Get title
title = soup.find('title')
if title:
content_parts.append(f"# {title.get_text().strip()}\n")
# Get main content
main_content = self._extract_main_content(soup)
if main_content:
content_parts.append(main_content)
else:
# Fallback to body text
body = soup.find('body')
if body:
content_parts.append(body.get_text())
content = '\n'.join(content_parts)
# Clean up the content
content = self._clean_content(content)
metadata = {
"url": url,
"status_code": response.status_code,
"content_type": response.headers.get('content-type', ''),
"content_length": len(response.content),
"processor": self.__class__.__name__
}
return ConversionResult(content, metadata)
except Exception as e:
raise ConversionError(f"Failed to process web page {url}: {str(e)}")
def _is_url(self, text: str) -> bool:
"""Check if the text looks like a URL.
Args:
text: Text to check
Returns:
True if text looks like a URL
"""
try:
result = urlparse(text)
return all([result.scheme, result.netloc])
except:
return False
def _extract_main_content(self, soup) -> str:
"""Extract main content from the HTML.
Args:
soup: BeautifulSoup object
Returns:
Extracted main content
"""
# Try to find main content areas
main_selectors = [
'main',
'[role="main"]',
'.main-content',
'.content',
'#content',
'article',
'.post-content',
'.entry-content'
]
for selector in main_selectors:
element = soup.select_one(selector)
if element:
return element.get_text()
# If no main content found, return empty string
return ""
def _clean_content(self, content: str) -> str:
"""Clean up the extracted web content.
Args:
content: Raw web text content
Returns:
Cleaned text content
"""
# Remove excessive whitespace and normalize
lines = content.split('\n')
cleaned_lines = []
for line in lines:
# Remove excessive whitespace
line = ' '.join(line.split())
if line.strip():
cleaned_lines.append(line)
# Join lines and add proper spacing
content = '\n'.join(cleaned_lines)
# Add spacing around headers
content = content.replace('# ', '\n# ')
content = content.replace('## ', '\n## ')
return content.strip()
```
## /docstrange/services/__init__.py
```py path="/docstrange/services/__init__.py"
"""Services for local LLM processing."""
from .ollama_service import OllamaFieldExtractor
__all__ = ["OllamaFieldExtractor"]
```
## /docstrange/static/logo_clean.png
Binary file available at https://raw.githubusercontent.com/NanoNets/docstrange/refs/heads/main/docstrange/static/logo_clean.png
## /docstrange/utils/__init__.py
```py path="/docstrange/utils/__init__.py"
"""Utility functions for the LLM extractor."""
from .gpu_utils import (
is_gpu_available,
get_gpu_info,
should_use_gpu_processor,
get_processor_preference
)
__all__ = [
"is_gpu_available",
"get_gpu_info",
"should_use_gpu_processor",
"get_processor_preference"
]
```
## /docstrange/utils/gpu_utils.py
```py path="/docstrange/utils/gpu_utils.py"
"""GPU utility functions for detecting and managing GPU availability."""
import logging
from typing import Dict, Optional
logger = logging.getLogger(__name__)
def is_gpu_available() -> bool:
"""Check if GPU is available for deep learning models.
Returns:
True if GPU is available, False otherwise
"""
try:
import torch
if torch.cuda.is_available():
gpu_count = torch.cuda.device_count()
gpu_name = torch.cuda.get_device_name(0) if gpu_count > 0 else "Unknown"
logger.info(f"GPU detected: {gpu_name} (count: {gpu_count})")
return True
else:
logger.info("No CUDA GPU available")
return False
except ImportError:
logger.info("PyTorch not available, assuming no GPU")
return False
except Exception as e:
logger.warning(f"Error checking GPU availability: {e}")
return False
def get_gpu_info() -> Dict:
"""Get detailed GPU information.
Returns:
Dictionary with GPU information
"""
info = {
"available": False,
"count": 0,
"names": [],
"memory": []
}
try:
import torch
if torch.cuda.is_available():
info["available"] = True
info["count"] = torch.cuda.device_count()
info["names"] = [torch.cuda.get_device_name(i) for i in range(info["count"])]
info["memory"] = [torch.cuda.get_device_properties(i).total_memory for i in range(info["count"])]
except ImportError:
pass
except Exception as e:
logger.warning(f"Error getting GPU info: {e}")
return info
def should_use_gpu_processor() -> bool:
"""Determine if GPU processor should be used based on GPU availability.
Returns:
True if GPU processor should be used, False otherwise
"""
return is_gpu_available()
def get_processor_preference() -> str:
"""Get the preferred processor type based on system capabilities.
Returns:
'gpu' if GPU is available
Raises:
RuntimeError: If GPU is not available
"""
if should_use_gpu_processor():
return 'gpu'
else:
raise RuntimeError(
"GPU is not available. Please ensure CUDA is installed and a compatible GPU is present, "
"or use cloud processing mode."
)
```
## /example.py
```py path="/example.py"
from docstrange import DocumentExtractor
file_path = "sample_documents/invoice.pdf"
extractor = DocumentExtractor()
result = extractor.extract(file_path).extract_data(specified_fields=[
"total_amount",
"date",
"vendor_name",
"invoice_number"
])
print(result)
exit()
print("📝=============================== JSON Output:===============================")
result = extractor.extract(file_path).extract_data()
print(result)
print("\n📝=============================== Specific Field :===============================")
result = extractor.extract(file_path)
specific_fields = result.extract_data(specified_fields=[
"total_amount",
"date",
"vendor_name",
"invoice_number"
])
print(specific_fields)
print("\n📝=============================== JSON Schema Extraction:===============================")
schema = {
"invoice_number": "string",
"total_amount": "number",
"vendor_name": "string",
"items": [{
"description": "string",
"amount": "number"
}]
}
structured_data = result.extract_data(json_schema=schema)
print(structured_data)
```
## /examples/test.py
```py path="/examples/test.py"
#!/usr/bin/env python3
from docstrange import FileConverter
file_path = "sample_documents/sample.png"
converter = FileConverter()
result = converter.convert(file_path).to_markdown()
print("📝=============================== Markdown Output:===============================")
print(result)
```
## /mcp_server_module/__init__.py
```py path="/mcp_server_module/__init__.py"
"""MCP Server for docstrange - intelligent PDF document processing."""
from .server import DocstrangeServer, main
__all__ = ["DocstrangeServer", "main"]
```
## /mcp_server_module/__main__.py
```py path="/mcp_server_module/__main__.py"
"""Main entry point for MCP server module."""
import asyncio
from .server import main
if __name__ == "__main__":
asyncio.run(main())
```
## /mcp_server_module/claude_desktop_config.json
```json path="/mcp_server_module/claude_desktop_config.json"
{
"mcpServers": {
"docstrange": {
"command": "/Users/prathameshjuvatkar/.pyenv/shims/python3",
"args": ["-m", "mcp_server_module"],
"env": {
"PYTHONPATH": "/Users/prathameshjuvatkar/workspace/docstrange"
}
}
}
}
```
## /scripts/README.md
# S3 Model Hosting Setup
This directory contains scripts for managing model hosting on Nanonets S3.
## Model Hosting Infrastructure
The docstrange uses a dual hosting system:
1. **Primary**: Nanonets S3 bucket (`public-vlms`) - faster, no authentication required
2. **Fallback**: Hugging Face Hub - original source, requires authentication for some models
## Files
- `prepare_s3_models.py` - Downloads models from Hugging Face and packages them for S3 upload
## Current S3 Setup
**Bucket**: `public-vlms`
**Region**: `us-west-2`
**Base URL**: `https://public-vlms.s3-us-west-2.amazonaws.com/docstrange/`
### Hosted Models
1. **Layout Model** (`layout-model-v2.2.0.tar.gz`) - 151.8 MB
- Source: `ds4sd/docling-models` model_artifacts/layout
- Used for: Document layout detection and segmentation
2. **TableFormer Model** (`tableformer-model-v2.2.0.tar.gz`) - 317.5 MB
- Source: `ds4sd/docling-models` model_artifacts/tableformer
- Used for: Table structure recognition and extraction
3. **EasyOCR** - Handled automatically by the EasyOCR library
- No S3 hosting needed - downloads its own models
## Usage
### One-time Setup (Already Completed)
1. Run the preparation script:
```bash
python scripts/prepare_s3_models.py
```
2. Upload to S3:
```bash
aws s3 cp dist/layout-model-v2.2.0.tar.gz s3://public-vlms/docstrange/ --acl public-read
aws s3 cp dist/tableformer-model-v2.2.0.tar.gz s3://public-vlms/docstrange/ --acl public-read
```
### Model Download Behavior
The `ModelDownloader` class automatically:
1. Tries S3 first (faster, no auth required)
2. Falls back to Hugging Face if S3 fails
3. Provides graceful degradation if no models available
## Environment Variables
- `document_extractor_PREFER_HF=true` - Force use of Hugging Face instead of S3
## Benefits of S3 Hosting
- ✅ **No Authentication Required** - Works out of the box
- ✅ **Faster Downloads** - Optimized S3 delivery
- ✅ **High Availability** - Redundant storage
- ✅ **Cost Effective** - Public bucket with efficient delivery
- ✅ **Fallback Support** - Automatic Hugging Face fallback
## /scripts/__init__.py
```py path="/scripts/__init__.py"
# Scripts for Document Data Extractor development and deployment
```
## /tests/debug_ocr_provider.py
```py path="/tests/debug_ocr_provider.py"
#!/usr/bin/env python3
import logging
from docstrange import DocumentExtractor
from docstrange.config import InternalConfig
# Set up detailed logging
logging.basicConfig(level=logging.INFO, format='%(name)s - %(levelname)s - %(message)s')
print("=== OCR Provider Debug ===")
print(f"Default OCR provider: {InternalConfig.ocr_provider}")
file_path = "sample_documents/sample.png"
print(f"\n=== Testing with file: {file_path} ===")
extractor = DocumentExtractor()
# Test the conversion
result = extractor.extract(file_path).extract_markdown()
print("\n📝=============================== Markdown Output:===============================")
print(result)
```
The content has been capped at 50000 tokens. The user could consider applying other filters to refine the result. The better and more specific the context, the better the LLM can follow instructions. If the context seems verbose, the user can refine the filter using uithub. Thank you for using https://uithub.com - Perfect LLM context for any GitHub repo.