``` ├── .github/ ├── workflows/ ├── docs.yml ├── tests-frontend.yml ├── tests.yml ├── .gitignore ├── CNAME ├── LICENSE ├── README.md ├── backend/ ├── fastrtc/ ├── __init__.py ├── credentials.py ├── pause_detection/ ├── __init__.py ├── protocol.py ├── silero.py ├── py.typed ├── reply_on_pause.py ├── reply_on_stopwords.py ├── speech_to_text/ ├── __init__.py ├── stt_.py ├── test_file.wav ├── stream.py ├── templates/ ├── component/ ├── assets/ ├── worker-lPYB70QI.js ├── index.js ``` ## /.github/workflows/docs.yml ```yml path="/.github/workflows/docs.yml" name: docs on: push: branches: - main pull_request: branches: - main permissions: contents: write pull-requests: write deployments: write pages: write jobs: deploy: runs-on: ubuntu-latest if: github.event_name == 'push' || (github.event_name == 'pull_request' && github.event.pull_request.head.repo.fork == false) steps: - uses: actions/checkout@v4 - name: Configure Git Credentials run: | git config user.name github-actions[bot] git config user.email 41898282+github-actions[bot]@users.noreply.github.com - uses: actions/setup-python@v5 with: python-version: 3.x - run: echo "cache_id=$(date --utc '+%V')" >> $GITHUB_ENV - uses: actions/cache@v4 with: key: mkdocs-material-${{ env.cache_id }} path: .cache restore-keys: | mkdocs-material- - run: pip install mkdocs-material mkdocs-llmstxt==0.1.0 - name: Build docs run: mkdocs build - name: Deploy to GH Pages (main) if: github.event_name == 'push' run: mkdocs gh-deploy --force - name: Deploy PR Preview if: github.event_name == 'pull_request' uses: rossjrw/pr-preview-action@v1 with: source-dir: ./site preview-branch: gh-pages umbrella-dir: pr-preview action: auto ``` ## /.github/workflows/tests-frontend.yml ```yml path="/.github/workflows/tests-frontend.yml" name: tests on: [push, pull_request] jobs: prettier: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - uses: actions/setup-node@v4 with: node-version: 18 - name: Run prettier run: | cd frontend npm install npx prettier --check . ``` ## /.github/workflows/tests.yml ```yml path="/.github/workflows/tests.yml" name: tests on: [push, pull_request] jobs: lint: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - uses: actions/setup-python@v5 with: python-version: '3.10' - name: Run linters run: | pip install ruff pyright pip install -e .[dev] ruff check . ruff format --check --diff . pyright test: runs-on: ${{ matrix.os }} strategy: fail-fast: false matrix: os: [ubuntu-latest] python: - '3.10' - '3.13' steps: - uses: actions/checkout@v4 - uses: actions/setup-python@v5 with: python-version: ${{ matrix.python }} - name: Run tests run: | python -m pip install -U pip pip install '.[dev, tts]' python -m pytest --capture=no shell: bash ``` ## /.gitignore ```gitignore path="/.gitignore" .eggs/ dist/ *.pyc __pycache__/ *.py[cod] *$py.class __tmp/* *.pyi .mypycache .ruff_cache node_modules demo/MobileNetSSD_deploy.caffemodel demo/MobileNetSSD_deploy.prototxt.txt demo/scratch .gradio .vscode .DS_Store .venv* .env ``` ## /CNAME ``` path="/CNAME" fastrtc.org ``` ## /LICENSE ``` path="/LICENSE" MIT License Copyright (c) 2024 Freddy Boulton Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. ``` ## /README.md

FastRTC

FastRTC Logo
Static Badge Static Badge

The Real-Time Communication Library for Python.

Turn any python function into a real-time audio and video stream over WebRTC or WebSockets. ## Installation ```bash pip install fastrtc ``` to use built-in pause detection (see [ReplyOnPause](https://fastrtc.org/userguide/audio/#reply-on-pause)), and text to speech (see [Text To Speech](https://fastrtc.org/userguide/audio/#text-to-speech)), install the `vad` and `tts` extras: ```bash pip install "fastrtc[vad, tts]" ``` ## Key Features - 🗣️ Automatic Voice Detection and Turn Taking built-in, only worry about the logic for responding to the user. - 💻 Automatic UI - Use the `.ui.launch()` method to launch the webRTC-enabled built-in Gradio UI. - 🔌 Automatic WebRTC Support - Use the `.mount(app)` method to mount the stream on a FastAPI app and get a webRTC endpoint for your own frontend! - ⚡️ Websocket Support - Use the `.mount(app)` method to mount the stream on a FastAPI app and get a websocket endpoint for your own frontend! - 📞 Automatic Telephone Support - Use the `fastphone()` method of the stream to launch the application and get a free temporary phone number! - 🤖 Completely customizable backend - A `Stream` can easily be mounted on a FastAPI app so you can easily extend it to fit your production application. See the [Talk To Claude](https://huggingface.co/spaces/fastrtc/talk-to-claude) demo for an example on how to serve a custom JS frontend. ## Docs [https://fastrtc.org](https://fastrtc.org) ## Examples See the [Cookbook](https://fastrtc.org/cookbook/) for examples of how to use the library.

🗣️👀 Gemini Audio Video Chat

Stream BOTH your webcam video and audio feeds to Google Gemini. You can also upload images to augment your conversation!

Demo | Code

🗣️ Google Gemini Real Time Voice API

Talk to Gemini in real time using Google's voice API.

Demo | Code

🗣️ OpenAI Real Time Voice API

Talk to ChatGPT in real time using OpenAI's voice API.

Demo | Code

🤖 Hello Computer

Say computer before asking your question!

Demo | Code

🤖 Llama Code Editor

Create and edit HTML pages with just your voice! Powered by SambaNova systems.

Demo | Code

🗣️ Talk to Claude

Use the Anthropic and Play.Ht APIs to have an audio conversation with Claude.

Demo | Code

🎵 Whisper Transcription

Have whisper transcribe your speech in real time!

Demo | Code

📷 Yolov10 Object Detection

Run the Yolov10 model on a user webcam stream in real time!

Demo | Code

🗣️ Kyutai Moshi

Kyutai's moshi is a novel speech-to-speech model for modeling human conversations.

Demo | Code

🗣️ Hello Llama: Stop Word Detection

A code editor built with Llama 3.3 70b that is triggered by the phrase "Hello Llama". Build a Siri-like coding assistant in 100 lines of code!

Demo | Code

## Usage This is an shortened version of the official [usage guide](https://freddyaboulton.github.io/gradio-webrtc/user-guide/). - `.ui.launch()`: Launch a built-in UI for easily testing and sharing your stream. Built with [Gradio](https://www.gradio.app/). - `.fastphone()`: Get a free temporary phone number to call into your stream. Hugging Face token required. - `.mount(app)`: Mount the stream on a [FastAPI](https://fastapi.tiangolo.com/) app. Perfect for integrating with your already existing production system. ## Quickstart ### Echo Audio ```python from fastrtc import Stream, ReplyOnPause import numpy as np def echo(audio: tuple[int, np.ndarray]): # The function will be passed the audio until the user pauses # Implement any iterator that yields audio # See "LLM Voice Chat" for a more complete example yield audio stream = Stream( handler=ReplyOnPause(echo), modality="audio", mode="send-receive", ) ``` ### LLM Voice Chat ```py from fastrtc import ( ReplyOnPause, AdditionalOutputs, Stream, audio_to_bytes, aggregate_bytes_to_16bit ) import gradio as gr from groq import Groq import anthropic from elevenlabs import ElevenLabs groq_client = Groq() claude_client = anthropic.Anthropic() tts_client = ElevenLabs() # See "Talk to Claude" in Cookbook for an example of how to keep # track of the chat history. def response( audio: tuple[int, np.ndarray], ): prompt = groq_client.audio.transcriptions.create( file=("audio-file.mp3", audio_to_bytes(audio)), model="whisper-large-v3-turbo", response_format="verbose_json", ).text response = claude_client.messages.create( model="claude-3-5-haiku-20241022", max_tokens=512, messages=[{"role": "user", "content": prompt}], ) response_text = " ".join( block.text for block in response.content if getattr(block, "type", None) == "text" ) iterator = tts_client.text_to_speech.convert_as_stream( text=response_text, voice_id="JBFqnCBsd6RMkjVDRZzb", model_id="eleven_multilingual_v2", output_format="pcm_24000" ) for chunk in aggregate_bytes_to_16bit(iterator): audio_array = np.frombuffer(chunk, dtype=np.int16).reshape(1, -1) yield (24000, audio_array) stream = Stream( modality="audio", mode="send-receive", handler=ReplyOnPause(response), ) ``` ### Webcam Stream ```python from fastrtc import Stream import numpy as np def flip_vertically(image): return np.flip(image, axis=0) stream = Stream( handler=flip_vertically, modality="video", mode="send-receive", ) ``` ### Object Detection ```python from fastrtc import Stream import gradio as gr import cv2 from huggingface_hub import hf_hub_download from .inference import YOLOv10 model_file = hf_hub_download( repo_id="onnx-community/yolov10n", filename="onnx/model.onnx" ) # git clone https://huggingface.co/spaces/fastrtc/object-detection # for YOLOv10 implementation model = YOLOv10(model_file) def detection(image, conf_threshold=0.3): image = cv2.resize(image, (model.input_width, model.input_height)) new_image = model.detect_objects(image, conf_threshold) return cv2.resize(new_image, (500, 500)) stream = Stream( handler=detection, modality="video", mode="send-receive", additional_inputs=[ gr.Slider(minimum=0, maximum=1, step=0.01, value=0.3) ] ) ``` ## Running the Stream Run: ### Gradio ```py stream.ui.launch() ``` ### Telephone (Audio Only) ```py stream.fastphone() ``` ### FastAPI ```py app = FastAPI() stream.mount(app) # Optional: Add routes @app.get("/") async def _(): return HTMLResponse(content=open("index.html").read()) # uvicorn app:app --host 0.0.0.0 --port 8000 ``` ## /backend/fastrtc/__init__.py ```py path="/backend/fastrtc/__init__.py" from .credentials import ( get_cloudflare_turn_credentials, get_cloudflare_turn_credentials_async, get_hf_turn_credentials, get_hf_turn_credentials_async, get_turn_credentials, get_turn_credentials_async, get_twilio_turn_credentials, ) from .pause_detection import ( ModelOptions, PauseDetectionModel, SileroVadOptions, get_silero_model, ) from .reply_on_pause import AlgoOptions, ReplyOnPause from .reply_on_stopwords import ReplyOnStopWords from .speech_to_text import MoonshineSTT, get_stt_model from .stream import Stream, UIArgs from .text_to_speech import ( CartesiaTTSOptions, KokoroTTSOptions, get_tts_model, ) from .tracks import ( AsyncAudioVideoStreamHandler, AsyncStreamHandler, AudioEmitType, AudioVideoStreamHandler, StreamHandler, VideoEmitType, VideoStreamHandler, ) from .utils import ( AdditionalOutputs, CloseStream, Warning, WebRTCError, aggregate_bytes_to_16bit, async_aggregate_bytes_to_16bit, audio_to_bytes, audio_to_file, audio_to_float32, audio_to_int16, get_current_context, wait_for_item, ) from .webrtc import ( WebRTC, ) __all__ = [ "AsyncStreamHandler", "AudioVideoStreamHandler", "AudioEmitType", "AsyncAudioVideoStreamHandler", "AlgoOptions", "AdditionalOutputs", "aggregate_bytes_to_16bit", "async_aggregate_bytes_to_16bit", "audio_to_bytes", "audio_to_file", "audio_to_float32", "audio_to_int16", "get_hf_turn_credentials", "get_twilio_turn_credentials", "get_turn_credentials", "ReplyOnPause", "ReplyOnStopWords", "SileroVadOptions", "get_stt_model", "MoonshineSTT", "StreamHandler", "Stream", "VideoEmitType", "WebRTC", "WebRTCError", "Warning", "get_tts_model", "KokoroTTSOptions", "get_cloudflare_turn_credentials_async", "get_hf_turn_credentials_async", "get_turn_credentials_async", "get_cloudflare_turn_credentials", "wait_for_item", "UIArgs", "ModelOptions", "PauseDetectionModel", "get_silero_model", "SileroVadOptions", "VideoStreamHandler", "CloseStream", "get_current_context", "CartesiaTTSOptions", ] ``` ## /backend/fastrtc/credentials.py ```py path="/backend/fastrtc/credentials.py" import os import warnings from typing import Literal import httpx CLOUDFLARE_FASTRTC_TURN_URL = "https://turn.fastrtc.org/credentials" async_httpx_client = httpx.AsyncClient() def _format_response(response): if response.is_success: return response.json() else: raise Exception( f"Failed to get TURN credentials: {response.status_code} {response.text}" ) def get_hf_turn_credentials(token=None, ttl=600): """Retrieves TURN credentials from Hugging Face (deprecated). This function fetches TURN server credentials using a Hugging Face token. It is deprecated and `get_cloudflare_turn_credentials` should be used instead. Args: token (str, optional): Hugging Face API token. Defaults to None, in which case the HF_TOKEN environment variable is used. ttl (int, optional): Time-to-live for the credentials in seconds. Defaults to 600. Returns: dict: A dictionary containing the TURN credentials. Raises: ValueError: If no token is provided and the HF_TOKEN environment variable is not set. Exception: If the request to the TURN server fails. """ warnings.warn( "get_hf_turn_credentials is deprecated. Use get_cloudflare_turn_credentials instead.", UserWarning, ) if token is None: token = os.getenv("HF_TOKEN") if token is None: raise ValueError( "HF_TOKEN environment variable must be set or token must be provided to use get_hf_turn_credentials" ) response = httpx.get( CLOUDFLARE_FASTRTC_TURN_URL, headers={ "Authorization": f"Bearer {token}", "Content-Type": "application/json", }, params={"ttl": ttl}, ) return _format_response(response) async def get_hf_turn_credentials_async( token=None, ttl=600, client: httpx.AsyncClient | None = None ): """Asynchronously retrieves TURN credentials from Hugging Face (deprecated). This function asynchronously fetches TURN server credentials using a Hugging Face token. It is deprecated and `get_cloudflare_turn_credentials_async` should be used instead. Args: token (str, optional): Hugging Face API token. Defaults to None, in which case the HF_TOKEN environment variable is used. ttl (int, optional): Time-to-live for the credentials in seconds. Defaults to 600. client (httpx.AsyncClient | None, optional): An existing httpx async client to use for the request. If None, a default client is used. Defaults to None. Returns: dict: A dictionary containing the TURN credentials. Raises: ValueError: If no token is provided and the HF_TOKEN environment variable is not set. Exception: If the request to the TURN server fails. """ warnings.warn( "get_hf_turn_credentials_async is deprecated. Use get_cloudflare_turn_credentials_async instead.", UserWarning, ) if client is None: client = async_httpx_client if token is None: token = os.getenv("HF_TOKEN") if token is None: raise ValueError( "HF_TOKEN environment variable must be set or token must be provided to use get_hf_turn_credentials" ) async with client: response = await client.get( "https://turn.fastrtc.org/credentials", headers={"Authorization": f"Bearer {token}"}, params={"ttl": ttl}, ) return _format_response(response) def get_cloudflare_turn_credentials( turn_key_id=None, turn_key_api_token=None, hf_token=None, ttl=600 ): """Retrieves TURN credentials from Cloudflare or Hugging Face. Fetches TURN server credentials either directly from Cloudflare using API keys or via the Hugging Face TURN endpoint using an HF token. The HF token method takes precedence if provided. Args: turn_key_id (str, optional): Cloudflare TURN key ID. Defaults to None, in which case the CLOUDFLARE_TURN_KEY_ID environment variable is used. turn_key_api_token (str, optional): Cloudflare TURN key API token. Defaults to None, in which case the CLOUDFLARE_TURN_KEY_API_TOKEN environment variable is used. hf_token (str, optional): Hugging Face API token. If provided, this method is used instead of Cloudflare keys. Defaults to None, in which case the HF_TOKEN environment variable is used. ttl (int, optional): Time-to-live for the credentials in seconds. Defaults to 600. Returns: dict: A dictionary containing the TURN credentials (ICE servers). Raises: ValueError: If neither HF token nor Cloudflare keys (either as arguments or environment variables) are provided. Exception: If the request to the credential server fails. """ if hf_token is None: hf_token = os.getenv("HF_TOKEN") if hf_token is not None: return httpx.get( CLOUDFLARE_FASTRTC_TURN_URL, headers={"Authorization": f"Bearer {hf_token}"}, params={"ttl": ttl}, ).json() else: if turn_key_id is None or turn_key_api_token is None: turn_key_id = os.getenv("CLOUDFLARE_TURN_KEY_ID") turn_key_api_token = os.getenv("CLOUDFLARE_TURN_KEY_API_TOKEN") if turn_key_id is None or turn_key_api_token is None: raise ValueError( "HF_TOKEN or CLOUDFLARE_TURN_KEY_ID and CLOUDFLARE_TURN_KEY_API_TOKEN must be set to use get_cloudflare_turn_credentials_sync" ) response = httpx.post( f"https://rtc.live.cloudflare.com/v1/turn/keys/{turn_key_id}/credentials/generate-ice-servers", headers={ "Authorization": f"Bearer {turn_key_api_token}", "Content-Type": "application/json", }, json={"ttl": ttl}, ) if response.is_success: return response.json() else: raise Exception( f"Failed to get TURN credentials: {response.status_code} {response.text}" ) async def get_cloudflare_turn_credentials_async( turn_key_id=None, turn_key_api_token=None, hf_token=None, ttl=600, client: httpx.AsyncClient | None = None, ): """Asynchronously retrieves TURN credentials from Cloudflare or Hugging Face. Asynchronously fetches TURN server credentials either directly from Cloudflare using API keys or via the Hugging Face TURN endpoint using an HF token. The HF token method takes precedence if provided. Args: turn_key_id (str, optional): Cloudflare TURN key ID. Defaults to None, in which case the CLOUDFLARE_TURN_KEY_ID environment variable is used. turn_key_api_token (str, optional): Cloudflare TURN key API token. Defaults to None, in which case the CLOUDFLARE_TURN_KEY_API_TOKEN environment variable is used. hf_token (str, optional): Hugging Face API token. If provided, this method is used instead of Cloudflare keys. Defaults to None, in which case the HF_TOKEN environment variable is used. ttl (int, optional): Time-to-live for the credentials in seconds. Defaults to 600. client (httpx.AsyncClient | None, optional): An existing httpx async client to use for the request. If None, a new client is created per request. Defaults to None. Returns: dict: A dictionary containing the TURN credentials (ICE servers). Raises: ValueError: If neither HF token nor Cloudflare keys (either as arguments or environment variables) are provided. Exception: If the request to the credential server fails. """ if client is None: client = async_httpx_client if hf_token is None: hf_token = os.getenv("HF_TOKEN", "").strip() if hf_token is not None: async with httpx.AsyncClient() as client: response = await client.get( CLOUDFLARE_FASTRTC_TURN_URL, headers={"Authorization": f"Bearer {hf_token}"}, params={"ttl": ttl}, ) return _format_response(response) else: if turn_key_id is None or turn_key_api_token is None: turn_key_id = os.getenv("CLOUDFLARE_TURN_KEY_ID") turn_key_api_token = os.getenv("CLOUDFLARE_TURN_KEY_API_TOKEN") if turn_key_id is None or turn_key_api_token is None: raise ValueError( "HF_TOKEN or CLOUDFLARE_TURN_KEY_ID and CLOUDFLARE_TURN_KEY_API_TOKEN must be set to use get_cloudflare_turn_credentials" ) async with httpx.AsyncClient() as client: response = await client.post( f"https://rtc.live.cloudflare.com/v1/turn/keys/{turn_key_id}/credentials/generate-ice-servers", headers={ "Authorization": f"Bearer {turn_key_api_token}", "Content-Type": "application/json", }, json={"ttl": ttl}, ) if response.is_success: return response.json() else: raise Exception( f"Failed to get TURN credentials: {response.status_code} {response.text}" ) def get_twilio_turn_credentials(twilio_sid=None, twilio_token=None): """Retrieves TURN credentials from Twilio. Uses the Twilio REST API to generate temporary TURN credentials. Requires the `twilio` package to be installed. Args: twilio_sid (str, optional): Twilio Account SID. Defaults to None, in which case the TWILIO_ACCOUNT_SID environment variable is used. twilio_token (str, optional): Twilio Auth Token. Defaults to None, in which case the TWILIO_AUTH_TOKEN environment variable is used. Returns: dict: A dictionary containing the TURN credentials formatted for WebRTC, including 'iceServers' and 'iceTransportPolicy'. Raises: ImportError: If the `twilio` package is not installed. ValueError: If Twilio credentials (SID and token) are not provided either as arguments or environment variables. TwilioRestException: If the Twilio API request fails. """ try: from twilio.rest import Client except ImportError: raise ImportError("Please install twilio with `pip install twilio`") if not twilio_sid and not twilio_token: twilio_sid = os.environ.get("TWILIO_ACCOUNT_SID") twilio_token = os.environ.get("TWILIO_AUTH_TOKEN") client = Client(twilio_sid, twilio_token) token = client.tokens.create() return { "iceServers": token.ice_servers, "iceTransportPolicy": "relay", } def get_turn_credentials( method: Literal["hf", "twilio", "cloudflare"] = "cloudflare", **kwargs ): """Retrieves TURN credentials from the specified provider. Acts as a dispatcher function to call the appropriate credential retrieval function based on the method specified. Args: method (Literal["hf", "twilio", "cloudflare"], optional): The provider to use. 'hf' uses the deprecated Hugging Face endpoint. 'cloudflare' uses either Cloudflare keys or the HF endpoint. 'twilio' uses the Twilio API. Defaults to "cloudflare". **kwargs: Additional keyword arguments passed directly to the underlying provider-specific function (e.g., `token`, `ttl` for 'hf'; `twilio_sid`, `twilio_token` for 'twilio'; `turn_key_id`, `turn_key_api_token`, `hf_token`, `ttl` for 'cloudflare'). Returns: dict: A dictionary containing the TURN credentials from the chosen provider. Raises: ValueError: If an invalid method is specified. Also raises exceptions from the underlying provider functions (see their docstrings). """ if method == "hf": warnings.warn( "Method 'hf' is deprecated. Use 'cloudflare' instead.", UserWarning ) # Ensure only relevant kwargs are passed hf_kwargs = {k: v for k, v in kwargs.items() if k in ["token", "ttl"]} return get_hf_turn_credentials(**hf_kwargs) elif method == "cloudflare": # Ensure only relevant kwargs are passed cf_kwargs = { k: v for k, v in kwargs.items() if k in ["turn_key_id", "turn_key_api_token", "hf_token", "ttl"] } return get_cloudflare_turn_credentials(**cf_kwargs) elif method == "twilio": # Ensure only relevant kwargs are passed twilio_kwargs = { k: v for k, v in kwargs.items() if k in ["twilio_sid", "twilio_token"] } return get_twilio_turn_credentials(**twilio_kwargs) else: raise ValueError("Invalid method. Must be 'hf', 'twilio', or 'cloudflare'") async def get_turn_credentials_async( method: Literal["hf", "twilio", "cloudflare"] = "cloudflare", **kwargs ): """Asynchronously retrieves TURN credentials from the specified provider. Acts as an async dispatcher function to call the appropriate async credential retrieval function based on the method specified. Args: method (Literal["hf", "twilio", "cloudflare"], optional): The provider to use. 'hf' uses the deprecated Hugging Face endpoint. 'cloudflare' uses either Cloudflare keys or the HF endpoint. 'twilio' is not supported asynchronously by this function yet. Defaults to "cloudflare". **kwargs: Additional keyword arguments passed directly to the underlying provider-specific async function (e.g., `token`, `ttl`, `client` for 'hf'; `turn_key_id`, `turn_key_api_token`, `hf_token`, `ttl`, `client` for 'cloudflare'). Returns: dict: A dictionary containing the TURN credentials from the chosen provider. Raises: ValueError: If an invalid or unsupported method is specified (currently 'twilio' is not supported asynchronously here). NotImplementedError: If method 'twilio' is requested. Also raises exceptions from the underlying provider functions (see their docstrings). """ if method == "hf": warnings.warn( "Method 'hf' is deprecated. Use 'cloudflare' instead.", UserWarning ) # Ensure only relevant kwargs are passed hf_kwargs = {k: v for k, v in kwargs.items() if k in ["token", "ttl", "client"]} return await get_hf_turn_credentials_async(**hf_kwargs) elif method == "cloudflare": # Ensure only relevant kwargs are passed cf_kwargs = { k: v for k, v in kwargs.items() if k in ["turn_key_id", "turn_key_api_token", "hf_token", "ttl", "client"] } return await get_cloudflare_turn_credentials_async(**cf_kwargs) elif method == "twilio": # Twilio client library doesn't have a standard async interface for this. # You might need to run the sync version in an executor or use a different library. raise NotImplementedError( "Async retrieval for Twilio credentials is not implemented." ) else: raise ValueError("Invalid method. Must be 'hf', 'twilio', or 'cloudflare'") ``` ## /backend/fastrtc/pause_detection/__init__.py ```py path="/backend/fastrtc/pause_detection/__init__.py" from .protocol import ModelOptions, PauseDetectionModel from .silero import SileroVADModel, SileroVadOptions, get_silero_model __all__ = [ "SileroVADModel", "SileroVadOptions", "PauseDetectionModel", "ModelOptions", "get_silero_model", ] ``` ## /backend/fastrtc/pause_detection/protocol.py ```py path="/backend/fastrtc/pause_detection/protocol.py" from typing import Any, Protocol, TypeAlias import numpy as np from numpy.typing import NDArray from ..utils import AudioChunk ModelOptions: TypeAlias = Any class PauseDetectionModel(Protocol): def vad( self, audio: tuple[int, NDArray[np.int16] | NDArray[np.float32]], options: ModelOptions, ) -> tuple[float, list[AudioChunk]]: ... def warmup( self, ) -> None: ... ``` ## /backend/fastrtc/pause_detection/silero.py ```py path="/backend/fastrtc/pause_detection/silero.py" import logging import warnings from dataclasses import dataclass from functools import lru_cache import click import numpy as np from huggingface_hub import hf_hub_download from numpy.typing import NDArray from ..utils import AudioChunk, audio_to_float32 from .protocol import PauseDetectionModel logger = logging.getLogger(__name__) # The code below is adapted from https://github.com/snakers4/silero-vad. # The code below is adapted from https://github.com/gpt-omni/mini-omni/blob/main/utils/vad.py @lru_cache def get_silero_model() -> PauseDetectionModel: """Returns the VAD model instance and warms it up with dummy data.""" # Warm up the model with dummy data try: import importlib.util mod = importlib.util.find_spec("onnxruntime") if mod is None: raise RuntimeError("Install fastrtc[vad] to use ReplyOnPause") except (ValueError, ModuleNotFoundError): raise RuntimeError("Install fastrtc[vad] to use ReplyOnPause") model = SileroVADModel() print(click.style("INFO", fg="green") + ":\t Warming up VAD model.") model.warmup() print(click.style("INFO", fg="green") + ":\t VAD model warmed up.") return model @dataclass class SileroVadOptions: """VAD options. Attributes: threshold: Speech threshold. Silero VAD outputs speech probabilities for each audio chunk, probabilities ABOVE this value are considered as SPEECH. It is better to tune this parameter for each dataset separately, but "lazy" 0.5 is pretty good for most datasets. min_speech_duration_ms: Final speech chunks shorter min_speech_duration_ms are thrown out. max_speech_duration_s: Maximum duration of speech chunks in seconds. Chunks longer than max_speech_duration_s will be split at the timestamp of the last silence that lasts more than 100ms (if any), to prevent aggressive cutting. Otherwise, they will be split aggressively just before max_speech_duration_s. min_silence_duration_ms: In the end of each speech chunk wait for min_silence_duration_ms before separating it window_size_samples: Audio chunks of window_size_samples size are fed to the silero VAD model. WARNING! Silero VAD models were trained using 512, 1024, 1536 samples for 16000 sample rate. Values other than these may affect model performance!! speech_pad_ms: Final speech chunks are padded by speech_pad_ms each side speech_duration: If the length of the speech is less than this value, a pause will be detected. """ threshold: float = 0.5 min_speech_duration_ms: int = 250 max_speech_duration_s: float = float("inf") min_silence_duration_ms: int = 2000 window_size_samples: int = 1024 speech_pad_ms: int = 400 class SileroVADModel: @staticmethod def download_model() -> str: return hf_hub_download( repo_id="freddyaboulton/silero-vad", filename="silero_vad.onnx" ) def __init__(self): try: import onnxruntime except ImportError as e: raise RuntimeError( "Applying the VAD filter requires the onnxruntime package" ) from e path = self.download_model() opts = onnxruntime.SessionOptions() opts.inter_op_num_threads = 1 opts.intra_op_num_threads = 1 opts.log_severity_level = 4 self.session = onnxruntime.InferenceSession( path, providers=["CPUExecutionProvider"], sess_options=opts, ) def get_initial_state(self, batch_size: int): h = np.zeros((2, batch_size, 64), dtype=np.float32) c = np.zeros((2, batch_size, 64), dtype=np.float32) return h, c @staticmethod def collect_chunks(audio: np.ndarray, chunks: list[AudioChunk]) -> np.ndarray: """Collects and concatenates audio chunks.""" if not chunks: return np.array([], dtype=np.float32) return np.concatenate( [audio[chunk["start"] : chunk["end"]] for chunk in chunks] ) def get_speech_timestamps( self, audio: np.ndarray, vad_options: SileroVadOptions, **kwargs, ) -> list[AudioChunk]: """This method is used for splitting long audios into speech chunks using silero VAD. Args: audio: One dimensional float array. vad_options: Options for VAD processing. kwargs: VAD options passed as keyword arguments for backward compatibility. Returns: List of dicts containing begin and end samples of each speech chunk. """ threshold = vad_options.threshold min_speech_duration_ms = vad_options.min_speech_duration_ms max_speech_duration_s = vad_options.max_speech_duration_s min_silence_duration_ms = vad_options.min_silence_duration_ms window_size_samples = vad_options.window_size_samples speech_pad_ms = vad_options.speech_pad_ms if window_size_samples not in [512, 1024, 1536]: warnings.warn( "Unusual window_size_samples! Supported window_size_samples:\n" " - [512, 1024, 1536] for 16000 sampling_rate" ) sampling_rate = 16000 min_speech_samples = sampling_rate * min_speech_duration_ms / 1000 speech_pad_samples = sampling_rate * speech_pad_ms / 1000 max_speech_samples = ( sampling_rate * max_speech_duration_s - window_size_samples - 2 * speech_pad_samples ) min_silence_samples = sampling_rate * min_silence_duration_ms / 1000 min_silence_samples_at_max_speech = sampling_rate * 98 / 1000 audio_length_samples = len(audio) state = self.get_initial_state(batch_size=1) speech_probs = [] for current_start_sample in range(0, audio_length_samples, window_size_samples): chunk = audio[ current_start_sample : current_start_sample + window_size_samples ] if len(chunk) < window_size_samples: chunk = np.pad(chunk, (0, int(window_size_samples - len(chunk)))) speech_prob, state = self(chunk, state, sampling_rate) speech_probs.append(speech_prob) triggered = False speeches = [] current_speech = {} neg_threshold = threshold - 0.15 # to save potential segment end (and tolerate some silence) temp_end = 0 # to save potential segment limits in case of maximum segment size reached prev_end = next_start = 0 for i, speech_prob in enumerate(speech_probs): if (speech_prob >= threshold) and temp_end: temp_end = 0 if next_start < prev_end: next_start = window_size_samples * i if (speech_prob >= threshold) and not triggered: triggered = True current_speech["start"] = window_size_samples * i continue if ( triggered and (window_size_samples * i) - current_speech["start"] > max_speech_samples ): if prev_end: current_speech["end"] = prev_end speeches.append(current_speech) current_speech = {} # previously reached silence (< neg_thres) and is still not speech (< thres) if next_start < prev_end: triggered = False else: current_speech["start"] = next_start prev_end = next_start = temp_end = 0 else: current_speech["end"] = window_size_samples * i speeches.append(current_speech) current_speech = {} prev_end = next_start = temp_end = 0 triggered = False continue if (speech_prob < neg_threshold) and triggered: if not temp_end: temp_end = window_size_samples * i # condition to avoid cutting in very short silence if ( window_size_samples * i ) - temp_end > min_silence_samples_at_max_speech: prev_end = temp_end if (window_size_samples * i) - temp_end < min_silence_samples: continue else: current_speech["end"] = temp_end if ( current_speech["end"] - current_speech["start"] ) > min_speech_samples: speeches.append(current_speech) current_speech = {} prev_end = next_start = temp_end = 0 triggered = False continue if ( current_speech and (audio_length_samples - current_speech["start"]) > min_speech_samples ): current_speech["end"] = audio_length_samples speeches.append(current_speech) for i, speech in enumerate(speeches): if i == 0: speech["start"] = int(max(0, speech["start"] - speech_pad_samples)) if i != len(speeches) - 1: silence_duration = speeches[i + 1]["start"] - speech["end"] if silence_duration < 2 * speech_pad_samples: speech["end"] += int(silence_duration // 2) speeches[i + 1]["start"] = int( max(0, speeches[i + 1]["start"] - silence_duration // 2) ) else: speech["end"] = int( min(audio_length_samples, speech["end"] + speech_pad_samples) ) speeches[i + 1]["start"] = int( max(0, speeches[i + 1]["start"] - speech_pad_samples) ) else: speech["end"] = int( min(audio_length_samples, speech["end"] + speech_pad_samples) ) return speeches def warmup(self): for _ in range(10): dummy_audio = np.zeros(102400, dtype=np.float32) self.vad((24000, dummy_audio), None) def vad( self, audio: tuple[int, NDArray[np.float32] | NDArray[np.int16]], options: None | SileroVadOptions, ) -> tuple[float, list[AudioChunk]]: sampling_rate, audio_ = audio logger.debug("VAD audio shape input: %s", audio_.shape) try: audio_ = audio_to_float32(audio_) sr = 16000 if sr != sampling_rate: try: import librosa # type: ignore except ImportError as e: raise RuntimeError( "Applying the VAD filter requires the librosa if the input sampling rate is not 16000hz" ) from e audio_ = librosa.resample(audio_, orig_sr=sampling_rate, target_sr=sr) if not options: options = SileroVadOptions() speech_chunks = self.get_speech_timestamps(audio_, options) logger.debug("VAD speech chunks: %s", speech_chunks) audio_ = self.collect_chunks(audio_, speech_chunks) logger.debug("VAD audio shape: %s", audio_.shape) duration_after_vad = audio_.shape[0] / sr return duration_after_vad, speech_chunks except Exception as e: import math import traceback logger.debug("VAD Exception: %s", str(e)) exec = traceback.format_exc() logger.debug("traceback %s", exec) return math.inf, [] def __call__(self, x, state, sr: int): if len(x.shape) == 1: x = np.expand_dims(x, 0) if len(x.shape) > 2: raise ValueError( f"Too many dimensions for input audio chunk {len(x.shape)}" ) if sr / x.shape[1] > 31.25: # type: ignore raise ValueError("Input audio chunk is too short") h, c = state ort_inputs = { "input": x, "h": h, "c": c, "sr": np.array(sr, dtype="int64"), } out, h, c = self.session.run(None, ort_inputs) state = (h, c) return out, state ``` ## /backend/fastrtc/py.typed ```typed path="/backend/fastrtc/py.typed" ``` ## /backend/fastrtc/reply_on_pause.py ```py path="/backend/fastrtc/reply_on_pause.py" import asyncio import inspect from collections.abc import AsyncGenerator, Callable, Generator from dataclasses import dataclass, field from logging import getLogger from threading import Event from typing import Any, Literal, cast import numpy as np from numpy.typing import NDArray from .pause_detection import ModelOptions, PauseDetectionModel, get_silero_model from .tracks import EmitType, StreamHandler from .utils import AdditionalOutputs, create_message, split_output logger = getLogger(__name__) @dataclass class AlgoOptions: """Algorithm options.""" audio_chunk_duration: float = 0.6 started_talking_threshold: float = 0.2 speech_threshold: float = 0.1 @dataclass class AppState: stream: np.ndarray | None = None sampling_rate: int = 0 pause_detected: bool = False started_talking: bool = False responding: bool = False stopped: bool = False buffer: np.ndarray | None = None responded_audio: bool = False interrupted: asyncio.Event = field(default_factory=asyncio.Event) def new(self): return AppState() ReplyFnGenerator = ( Callable[ [tuple[int, NDArray[np.int16]], Any], Generator[EmitType, None, None], ] | Callable[ [tuple[int, NDArray[np.int16]]], Generator[EmitType, None, None], ] | Callable[ [tuple[int, NDArray[np.int16]]], AsyncGenerator[EmitType, None], ] | Callable[ [tuple[int, NDArray[np.int16]], Any], AsyncGenerator[EmitType, None], ] ) async def iterate(generator: Generator) -> Any: return next(generator) class ReplyOnPause(StreamHandler): """ A stream handler that processes incoming audio, detects pauses, and triggers a reply function (`fn`) when a pause is detected. This handler accumulates audio chunks, uses a Voice Activity Detection (VAD) model to determine speech segments, and identifies pauses based on configurable thresholds. Once a pause is detected after speech has started, it calls the provided generator function `fn` with the accumulated audio. It can optionally run a `startup_fn` at the beginning and supports interruption of the reply function if new audio arrives. Attributes: fn (ReplyFnGenerator): The generator function to call when a pause is detected. startup_fn (Callable | None): An optional function to run at startup. algo_options (AlgoOptions): Configuration for the pause detection algorithm. model_options (ModelOptions | None): Configuration for the VAD model. can_interrupt (bool): Whether incoming audio can interrupt the `fn` execution. expected_layout (Literal["mono", "stereo"]): Expected audio channel layout. output_sample_rate (int): Sample rate for the output audio from `fn`. input_sample_rate (int): Expected sample rate of the input audio. model (PauseDetectionModel): The VAD model instance. state (AppState): The current state of the pause detection logic. generator (Generator | AsyncGenerator | None): The active generator instance from `fn`. event (Event): Threading event used to signal pause detection. loop (asyncio.AbstractEventLoop): The asyncio event loop. """ def __init__( self, fn: ReplyFnGenerator, startup_fn: Callable | None = None, algo_options: AlgoOptions | None = None, model_options: ModelOptions | None = None, can_interrupt: bool = True, expected_layout: Literal["mono", "stereo"] = "mono", output_sample_rate: int = 24000, output_frame_size: int | None = None, # Deprecated input_sample_rate: int = 48000, model: PauseDetectionModel | None = None, ): """ Initializes the ReplyOnPause handler. Args: fn: The generator function to execute upon pause detection. It receives `(sample_rate, audio_array)` and optionally `*args`. startup_fn: An optional function to run once at the beginning. algo_options: Options for the pause detection algorithm. model_options: Options for the VAD model. can_interrupt: If True, incoming audio during `fn` execution will stop the generator and process the new audio. expected_layout: Expected input audio layout ('mono' or 'stereo'). output_sample_rate: The sample rate expected for audio yielded by `fn`. output_frame_size: Deprecated. input_sample_rate: The expected sample rate of incoming audio. model: An optional pre-initialized VAD model instance. """ super().__init__( expected_layout, output_sample_rate, output_frame_size, input_sample_rate=input_sample_rate, ) self.can_interrupt = can_interrupt self.expected_layout: Literal["mono", "stereo"] = expected_layout self.model = model or get_silero_model() self.fn = fn self.is_async = inspect.isasyncgenfunction(fn) self.event = Event() self.state = AppState() self.generator: ( Generator[EmitType, None, None] | AsyncGenerator[EmitType, None] | None ) = None self.model_options = model_options self.algo_options = algo_options or AlgoOptions() self.startup_fn = startup_fn @property def _needs_additional_inputs(self) -> bool: """Checks if the reply function `fn` expects additional arguments.""" return len(inspect.signature(self.fn).parameters) > 1 def start_up(self): """ Executes the startup function `startup_fn` if provided. Waits for additional arguments if `_needs_additional_inputs` is True before calling `startup_fn`. Sets the `event` after completion. """ if self.startup_fn: if self._needs_additional_inputs: self.wait_for_args_sync() args = self.latest_args[1:] else: args = () self.generator = self.startup_fn(*args) self.event.set() def copy(self): """Creates a new instance of ReplyOnPause with the same configuration.""" return ReplyOnPause( self.fn, self.startup_fn, self.algo_options, self.model_options, self.can_interrupt, self.expected_layout, self.output_sample_rate, self.output_frame_size, self.input_sample_rate, self.model, ) def determine_pause( self, audio: np.ndarray, sampling_rate: int, state: AppState ) -> bool: """ Analyzes an audio chunk to detect if a significant pause occurred after speech. Uses the VAD model to measure speech duration within the chunk. Updates the application state (`state`) regarding whether talking has started and accumulates speech segments. Args: audio: The numpy array containing the audio chunk. sampling_rate: The sample rate of the audio chunk. state: The current application state. Returns: True if a pause satisfying the configured thresholds is detected after speech has started, False otherwise. """ duration = len(audio) / sampling_rate if duration >= self.algo_options.audio_chunk_duration: dur_vad, _ = self.model.vad((sampling_rate, audio), self.model_options) logger.debug("VAD duration: %s", dur_vad) if ( dur_vad > self.algo_options.started_talking_threshold and not state.started_talking ): state.started_talking = True logger.debug("Started talking") self.send_message_sync(create_message("log", "started_talking")) if state.started_talking: if state.stream is None: state.stream = audio else: state.stream = np.concatenate((state.stream, audio)) state.buffer = None if dur_vad < self.algo_options.speech_threshold and state.started_talking: return True return False def process_audio(self, audio: tuple[int, np.ndarray], state: AppState) -> None: """ Processes an incoming audio frame. Appends the frame to the buffer, runs pause detection on the buffer, and updates the application state. Args: audio: A tuple containing the sample rate and the audio frame data. state: The current application state to update. """ frame_rate, array = audio array = np.squeeze(array) if not state.sampling_rate: state.sampling_rate = frame_rate if state.buffer is None: state.buffer = array else: state.buffer = np.concatenate((state.buffer, array)) pause_detected = self.determine_pause( state.buffer, state.sampling_rate, self.state ) state.pause_detected = pause_detected def receive(self, frame: tuple[int, np.ndarray]) -> None: """ Receives an audio frame from the stream. Processes the audio frame using `process_audio`. If a pause is detected, it sets the `event`. If interruption is enabled and a reply is ongoing, it closes the current generator and clears the processing queue. Args: frame: A tuple containing the sample rate and the audio frame data. """ if self.state.responding and not self.can_interrupt: return self.process_audio(frame, self.state) if self.state.pause_detected: self.event.set() if self.can_interrupt and self.state.responding: self._close_generator() self.generator = None if self.can_interrupt: self.clear_queue() def _close_generator(self): """ Safely closes the active reply generator (`self.generator`). Handles both synchronous and asynchronous generators, ensuring proper resource cleanup (e.g., calling `aclose()` or `close()`). Logs any errors during closure. """ if self.generator is None: return try: if self.is_async: # For async generators, we need to call aclose() if hasattr(self.generator, "aclose"): asyncio.run_coroutine_threadsafe( cast(AsyncGenerator[EmitType, None], self.generator).aclose(), self.loop, ).result(timeout=1.0) # Add timeout to prevent blocking else: # For sync generators, we can just exhaust it or close it if hasattr(self.generator, "close"): cast(Generator[EmitType, None, None], self.generator).close() except Exception as e: logger.debug(f"Error closing generator: {e}") def reset(self): """ Resets the handler state to its initial condition. Clears accumulated audio, resets state flags, closes any active generator, and clears the event flag. Also handles resetting argument state for phone mode. """ super().reset() if self.phone_mode: self.args_set.set() self.generator = None self.event.clear() self.state = AppState() def trigger_response(self): """ Manually triggers the response generation process. Sets the event flag, effectively simulating a pause detection. Initializes the stream buffer if it's empty. """ self.event.set() if self.state.stream is None: self.state.stream = np.array([], dtype=np.int16) async def async_iterate(self, generator) -> EmitType: """Helper function to get the next item from an async generator.""" return await anext(generator) def emit(self): """ Produces the next output chunk from the reply generator (`fn`). This method is called repeatedly after a pause is detected (event is set). If the generator is not already running, it initializes it by calling `fn` with the accumulated audio and any required additional arguments. It then yields the next item from the generator. Handles both sync and async generators. Resets the state upon generator completion or error. Returns: The next output item from the generator, or None if no pause event has occurred or the generator is exhausted. Raises: Exception: Re-raises exceptions occurring within the `fn` generator. """ if not self.event.is_set(): return None else: if not self.generator: self.send_message_sync(create_message("log", "pause_detected")) if self._needs_additional_inputs and not self.args_set.is_set(): if not self.phone_mode: self.wait_for_args_sync() else: self.latest_args = [None] self.args_set.set() logger.debug("Creating generator") audio = cast(np.ndarray, self.state.stream).reshape(1, -1) if self._needs_additional_inputs: self.latest_args[0] = (self.state.sampling_rate, audio) self.generator = self.fn(*self.latest_args) # type: ignore else: self.generator = self.fn((self.state.sampling_rate, audio)) # type: ignore logger.debug("Latest args: %s", self.latest_args) self.state = self.state.new() self.state.responding = True try: if self.is_async: output = asyncio.run_coroutine_threadsafe( self.async_iterate(self.generator), self.loop ).result() else: output = next(self.generator) # type: ignore audio, additional_outputs = split_output(output) if audio is not None: self.send_message_sync(create_message("log", "response_starting")) self.state.responded_audio = True if self.phone_mode: if isinstance(additional_outputs, AdditionalOutputs): self.latest_args = [None] + list(additional_outputs.args) return output except (StopIteration, StopAsyncIteration): if not self.state.responded_audio: self.send_message_sync(create_message("log", "response_starting")) self.reset() except Exception as e: import traceback traceback.print_exc() logger.debug("Error in ReplyOnPause: %s", e) self.reset() raise e ``` ## /backend/fastrtc/reply_on_stopwords.py ```py path="/backend/fastrtc/reply_on_stopwords.py" import asyncio import logging import re from collections.abc import Callable from typing import Literal import numpy as np from .reply_on_pause import ( AlgoOptions, AppState, ModelOptions, PauseDetectionModel, ReplyFnGenerator, ReplyOnPause, ) from .speech_to_text import get_stt_model, stt_for_chunks from .utils import audio_to_float32, create_message logger = logging.getLogger(__name__) class ReplyOnStopWordsState(AppState): """Extends AppState to include state specific to stop word detection.""" stop_word_detected: bool = False post_stop_word_buffer: np.ndarray | None = None started_talking_pre_stop_word: bool = False def new(self): """Creates a new instance of ReplyOnStopWordsState.""" return ReplyOnStopWordsState() class ReplyOnStopWords(ReplyOnPause): """ A stream handler that extends ReplyOnPause to trigger based on stop words followed by a pause. This handler listens to the incoming audio stream, performs Speech-to-Text (STT) to detect predefined stop words. Once a stop word is detected, it waits for a subsequent pause in speech (using the VAD model) before triggering the reply function (`fn`) with the audio recorded *after* the stop word. Attributes: stop_words (list[str]): A list of words or phrases that trigger the pause detection. state (ReplyOnStopWordsState): The current state of the stop word and pause detection logic. stt_model: The Speech-to-Text model instance used for detecting stop words. """ def __init__( self, fn: ReplyFnGenerator, stop_words: list[str], startup_fn: Callable | None = None, algo_options: AlgoOptions | None = None, model_options: ModelOptions | None = None, can_interrupt: bool = True, expected_layout: Literal["mono", "stereo"] = "mono", output_sample_rate: int = 24000, output_frame_size: int | None = None, # Deprecated input_sample_rate: int = 48000, model: PauseDetectionModel | None = None, ): """ Initializes the ReplyOnStopWords handler. Args: fn: The generator function to execute upon stop word and pause detection. It receives `(sample_rate, audio_array)` and optionally `*args`. stop_words: A list of strings (words or phrases) to listen for. Detection is case-insensitive and ignores punctuation. startup_fn: An optional function to run once at the beginning. algo_options: Options for the pause detection algorithm (used after stop word). model_options: Options for the VAD model. can_interrupt: If True, incoming audio during `fn` execution will stop the generator and process the new audio. expected_layout: Expected input audio layout ('mono' or 'stereo'). output_sample_rate: The sample rate expected for audio yielded by `fn`. output_frame_size: Deprecated. input_sample_rate: The expected sample rate of incoming audio. model: An optional pre-initialized VAD model instance. """ super().__init__( fn, algo_options=algo_options, startup_fn=startup_fn, model_options=model_options, can_interrupt=can_interrupt, expected_layout=expected_layout, output_sample_rate=output_sample_rate, output_frame_size=output_frame_size, input_sample_rate=input_sample_rate, model=model, ) self.stop_words = stop_words self.state = ReplyOnStopWordsState() self.stt_model = get_stt_model("moonshine/base") def stop_word_detected(self, text: str) -> bool: """ Checks if any of the configured stop words are present in the text. Performs a case-insensitive search, treating multi-word stop phrases correctly and ignoring basic punctuation. Args: text: The text transcribed from the audio. Returns: True if a stop word is found, False otherwise. """ for stop_word in self.stop_words: stop_word = stop_word.lower().strip().split(" ") if bool( re.search( r"\b" + r"\s+".join(map(re.escape, stop_word)) + r"[.,!?]*\b", text.lower(), ) ): logger.debug("Stop word detected: %s", stop_word) return True return False async def _send_stopword( self, ): """Internal async method to send a 'stopword' message via the channel.""" if self.channel: self.channel.send(create_message("stopword", "")) logger.debug("Sent stopword") def send_stopword(self): """Sends a 'stopword' message asynchronously via the communication channel.""" asyncio.run_coroutine_threadsafe(self._send_stopword(), self.loop) def determine_pause( # type: ignore self, audio: np.ndarray, sampling_rate: int, state: ReplyOnStopWordsState ) -> bool: """ Analyzes an audio chunk to detect stop words and subsequent pauses. Overrides the `ReplyOnPause.determine_pause` method. First, it performs STT on the audio buffer to detect stop words. Once a stop word is detected (`state.stop_word_detected` is True), it then uses the VAD model (similar to `ReplyOnPause`) to detect a pause in the audio *following* the stop word. Args: audio: The numpy array containing the audio chunk. sampling_rate: The sample rate of the audio chunk. state: The current application state (ReplyOnStopWordsState). Returns: True if a stop word has been detected and a subsequent pause satisfying the configured thresholds is detected, False otherwise. """ import librosa duration = len(audio) / sampling_rate if duration >= self.algo_options.audio_chunk_duration: if not state.stop_word_detected: audio_f32 = audio_to_float32(audio) audio_rs = librosa.resample( audio_f32, orig_sr=sampling_rate, target_sr=16000 ) if state.post_stop_word_buffer is None: state.post_stop_word_buffer = audio_rs else: state.post_stop_word_buffer = np.concatenate( (state.post_stop_word_buffer, audio_rs) ) if len(state.post_stop_word_buffer) / 16000 > 2: state.post_stop_word_buffer = state.post_stop_word_buffer[-32000:] dur_vad, chunks = self.model.vad( (16000, state.post_stop_word_buffer), self.model_options, ) text = stt_for_chunks( self.stt_model, (16000, state.post_stop_word_buffer), chunks ) logger.debug(f"STT: {text}") state.stop_word_detected = self.stop_word_detected(text) if state.stop_word_detected: logger.debug("Stop word detected") self.send_stopword() state.buffer = None else: dur_vad, _ = self.model.vad((sampling_rate, audio), self.model_options) logger.debug("VAD duration: %s", dur_vad) if ( dur_vad > self.algo_options.started_talking_threshold and not state.started_talking and state.stop_word_detected ): state.started_talking = True logger.debug("Started talking") if state.started_talking: if state.stream is None: state.stream = audio else: state.stream = np.concatenate((state.stream, audio)) state.buffer = None if ( dur_vad < self.algo_options.speech_threshold and state.started_talking and state.stop_word_detected ): return True return False def reset(self): """ Resets the handler state to its initial condition. Clears accumulated audio, resets state flags (including stop word state), closes any active generator, and clears the event flag. """ super().reset() self.generator = None self.event.clear() self.state = ReplyOnStopWordsState() def copy(self): """Creates a new instance of ReplyOnStopWords with the same configuration.""" return ReplyOnStopWords( self.fn, self.stop_words, self.startup_fn, self.algo_options, self.model_options, self.can_interrupt, self.expected_layout, self.output_sample_rate, self.output_frame_size, self.input_sample_rate, self.model, ) ``` ## /backend/fastrtc/speech_to_text/__init__.py ```py path="/backend/fastrtc/speech_to_text/__init__.py" from .stt_ import MoonshineSTT, get_stt_model, stt_for_chunks __all__ = ["get_stt_model", "MoonshineSTT", "get_stt_model", "stt_for_chunks"] ``` ## /backend/fastrtc/speech_to_text/stt_.py ```py path="/backend/fastrtc/speech_to_text/stt_.py" from functools import lru_cache from pathlib import Path from typing import Literal, Protocol import click import librosa import numpy as np from numpy.typing import NDArray from ..utils import AudioChunk, audio_to_float32 curr_dir = Path(__file__).parent class STTModel(Protocol): def stt(self, audio: tuple[int, NDArray[np.int16 | np.float32]]) -> str: ... class MoonshineSTT(STTModel): def __init__( self, model: Literal["moonshine/base", "moonshine/tiny"] = "moonshine/base" ): try: from moonshine_onnx import MoonshineOnnxModel, load_tokenizer except (ImportError, ModuleNotFoundError): raise ImportError( "Install fastrtc[stt] for speech-to-text and stopword detection support." ) self.model = MoonshineOnnxModel(model_name=model) self.tokenizer = load_tokenizer() def stt(self, audio: tuple[int, NDArray[np.int16 | np.float32]]) -> str: sr, audio_np = audio # type: ignore audio_np = audio_to_float32(audio_np) if sr != 16000: audio_np: NDArray[np.float32] = librosa.resample( audio_np, orig_sr=sr, target_sr=16000 ) if audio_np.ndim == 1: audio_np = audio_np.reshape(1, -1) tokens = self.model.generate(audio_np) return self.tokenizer.decode_batch(tokens)[0] @lru_cache def get_stt_model( model: Literal["moonshine/base", "moonshine/tiny"] = "moonshine/base", ) -> STTModel: import os os.environ["TOKENIZERS_PARALLELISM"] = "false" m = MoonshineSTT(model) from moonshine_onnx import load_audio audio = load_audio(str(curr_dir / "test_file.wav")) print(click.style("INFO", fg="green") + ":\t Warming up STT model.") m.stt((16000, audio)) print(click.style("INFO", fg="green") + ":\t STT model warmed up.") return m def stt_for_chunks( stt_model: STTModel, audio: tuple[int, NDArray[np.int16 | np.float32]], chunks: list[AudioChunk], ) -> str: sr, audio_np = audio return " ".join( [ stt_model.stt((sr, audio_np[chunk["start"] : chunk["end"]])) for chunk in chunks ] ) ``` ## /backend/fastrtc/speech_to_text/test_file.wav Binary file available at https://raw.githubusercontent.com/gradio-app/fastrtc/refs/heads/main/backend/fastrtc/speech_to_text/test_file.wav ## /backend/fastrtc/stream.py ```py path="/backend/fastrtc/stream.py" import inspect import logging from collections.abc import Callable from contextlib import AbstractAsyncContextManager from pathlib import Path from typing import ( Any, Literal, TypedDict, cast, ) import anyio import gradio as gr from fastapi import FastAPI, Request, WebSocket from fastapi.responses import HTMLResponse from gradio import Blocks from gradio.components.base import Component from pydantic import BaseModel from typing_extensions import NotRequired from .tracks import HandlerType, StreamHandlerImpl from .utils import RTCConfigurationCallable from .webrtc import WebRTC from .webrtc_connection_mixin import WebRTCConnectionMixin from .websocket import WebSocketHandler logger = logging.getLogger(__name__) curr_dir = Path(__file__).parent class Body(BaseModel): sdp: str | None = None candidate: dict[str, Any] | None = None type: str webrtc_id: str class UIArgs(TypedDict): """ UI customization arguments for the Gradio Blocks UI of the Stream class """ title: NotRequired[str] """Title of the demo""" subtitle: NotRequired[str] """Subtitle of the demo. Text will be centered and displayed below the title.""" icon: NotRequired[str] """Icon to display on the button instead of the wave animation. The icon should be a path/url to a .svg/.png/.jpeg file.""" icon_button_color: NotRequired[str] """Color of the icon button. Default is var(--color-accent) of the demo theme.""" pulse_color: NotRequired[str] """Color of the pulse animation. Default is var(--color-accent) of the demo theme.""" icon_radius: NotRequired[int] """Border radius of the icon button expressed as a percentage of the button size. Default is 50%.""" send_input_on: NotRequired[Literal["submit", "change"]] """When to send the input to the handler. Default is "change". If "submit", the input will be sent when the submit event is triggered by the user. If "change", the input will be sent whenever the user changes the input value. """ hide_title: NotRequired[bool] """If True, the title and subtitle will not be displayed.""" class Stream(WebRTCConnectionMixin): """ Define an audio or video stream with a built-in UI, mountable on a FastAPI app. This class encapsulates the logic for handling real-time communication (WebRTC) streams, including setting up peer connections, managing tracks, generating a Gradio user interface, and integrating with FastAPI for API endpoints. It supports different modes (send, receive, send-receive) and modalities (audio, video, audio-video), and can optionally handle additional Gradio input/output components alongside the stream. It also provides functionality for telephone integration via the FastPhone method. Attributes: mode (Literal["send-receive", "receive", "send"]): The direction of the stream. modality (Literal["video", "audio", "audio-video"]): The type of media stream. rtp_params (dict[str, Any] | None): Parameters for RTP encoding. event_handler (HandlerType): The main function to process stream data. concurrency_limit (int): The maximum number of concurrent connections allowed. time_limit (float | None): Time limit in seconds for the event handler execution. allow_extra_tracks (bool): Whether to allow extra tracks beyond the specified modality. additional_output_components (list[Component] | None): Extra Gradio output components. additional_input_components (list[Component] | None): Extra Gradio input components. additional_outputs_handler (Callable | None): Handler for additional outputs. track_constraints (dict[str, Any] | None): Constraints for media tracks (e.g., resolution). webrtc_component (WebRTC): The underlying Gradio WebRTC component instance. rtc_configuration (dict[str, Any] | None): Configuration for the RTCPeerConnection (e.g., ICE servers). _ui (Blocks): The Gradio Blocks UI instance. """ def __init__( self, handler: HandlerType, *, additional_outputs_handler: Callable | None = None, mode: Literal["send-receive", "receive", "send"] = "send-receive", modality: Literal["video", "audio", "audio-video"] = "video", concurrency_limit: int | None | Literal["default"] = "default", time_limit: float | None = None, allow_extra_tracks: bool = False, rtp_params: dict[str, Any] | None = None, rtc_configuration: RTCConfigurationCallable | None = None, server_rtc_configuration: dict[str, Any] | None = None, track_constraints: dict[str, Any] | None = None, additional_inputs: list[Component] | None = None, additional_outputs: list[Component] | None = None, ui_args: UIArgs | None = None, ): """ Initialize the Stream instance. Args: handler: The function to handle incoming stream data and return output data. additional_outputs_handler: An optional function to handle updates to additional output components. mode: The direction of the stream ('send', 'receive', or 'send-receive'). modality: The type of media ('video', 'audio', or 'audio-video'). concurrency_limit: Maximum number of concurrent connections. 'default' maps to 1. time_limit: Maximum execution time for the handler function in seconds. allow_extra_tracks: If True, allows connections with tracks not matching the modality. rtp_params: Optional dictionary of RTP encoding parameters. rtc_configuration: Optional Callable or dictionary for RTCPeerConnection configuration (e.g., ICE servers). Required when deploying on Colab or Spaces. server_rtc_configuration: Optional dictionary for RTCPeerConnection configuration on the server side. Note that setting iceServers to be an empty list will mean no ICE servers will be used in the server. track_constraints: Optional dictionary of constraints for media tracks (e.g., resolution, frame rate). additional_inputs: Optional list of extra Gradio input components. additional_outputs: Optional list of extra Gradio output components. Requires `additional_outputs_handler`. ui_args: Optional dictionary to customize the default UI appearance (title, subtitle, icon, etc.). Raises: ValueError: If `additional_outputs` are provided without `additional_outputs_handler`. """ WebRTCConnectionMixin.__init__(self) self.mode = mode self.modality = modality self.rtp_params = rtp_params self.event_handler = handler self.concurrency_limit = cast( (int), 1 if concurrency_limit in ["default", None] else concurrency_limit, ) self.concurrency_limit_gradio = cast( int | Literal["default"] | None, concurrency_limit ) self.time_limit = time_limit self.allow_extra_tracks = allow_extra_tracks self.additional_output_components = additional_outputs self.additional_input_components = additional_inputs self.additional_outputs_handler = additional_outputs_handler self.track_constraints = track_constraints self.webrtc_component: WebRTC self.rtc_configuration = rtc_configuration self.server_rtc_configuration = self.convert_to_aiortc_format( server_rtc_configuration ) self._ui = self._generate_default_ui(ui_args) self._ui.launch = self._wrap_gradio_launch(self._ui.launch) def mount(self, app: FastAPI, path: str = ""): """ Mount the stream's API endpoints onto a FastAPI application. This method adds the necessary routes (`/webrtc/offer`, `/telephone/handler`, `/telephone/incoming`, `/websocket/offer`) to the provided FastAPI app, prefixed with the optional `path`. It also injects a startup message into the app's lifespan. Args: app: The FastAPI application instance. path: An optional URL prefix for the mounted routes. """ from fastapi import APIRouter router = APIRouter(prefix=path) router.post("/webrtc/offer")(self.offer) router.websocket("/telephone/handler")(self.telephone_handler) router.post("/telephone/incoming")(self.handle_incoming_call) router.websocket("/websocket/offer")(self.websocket_offer) lifespan = self._inject_startup_message(app.router.lifespan_context) app.router.lifespan_context = lifespan app.include_router(router) @staticmethod def _print_error(env: Literal["colab", "spaces"]): """ Print an error message and raise RuntimeError for missing rtc_configuration. Used internally when running in Colab or Spaces without necessary WebRTC setup. Args: env: The environment ('colab' or 'spaces') where the error occurred. Raises: RuntimeError: Always raised after printing the error message. """ import click print( click.style("ERROR", fg="red") + f":\t Running in {env} is not possible without providing a valid rtc_configuration. " + "See " + click.style("https://fastrtc.org/deployment/", fg="cyan") + " for more information." ) raise RuntimeError( f"Running in {env} is not possible without providing a valid rtc_configuration. " + "See https://fastrtc.org/deployment/ for more information." ) def _check_colab_or_spaces(self): """ Check if running in Colab or Spaces and if rtc_configuration is missing. Calls `_print_error` if the conditions are met. Raises: RuntimeError: If running in Colab/Spaces without `rtc_configuration`. """ from gradio.utils import colab_check, get_space if colab_check() and not self.rtc_configuration: self._print_error("colab") if get_space() and not self.rtc_configuration: self._print_error("spaces") def _wrap_gradio_launch(self, callable): """ Wrap the Gradio launch method to inject environment checks. Ensures that `_check_colab_or_spaces` is called during the application lifespan when `Blocks.launch()` is invoked. Args: callable: The original `gradio.Blocks.launch` method. Returns: A wrapped version of the launch method. """ import contextlib def wrapper(*args, **kwargs): lifespan = kwargs.get("app_kwargs", {}).get("lifespan", None) @contextlib.asynccontextmanager async def new_lifespan(app: FastAPI): if lifespan is None: self._check_colab_or_spaces() yield else: async with lifespan(app): self._check_colab_or_spaces() yield if "app_kwargs" not in kwargs: kwargs["app_kwargs"] = {} kwargs["app_kwargs"]["lifespan"] = new_lifespan return callable(*args, **kwargs) return wrapper def _inject_startup_message( self, lifespan: Callable[[FastAPI], AbstractAsyncContextManager] | None = None ): """ Create a FastAPI lifespan context manager to print startup messages and check environment. Args: lifespan: An optional existing lifespan context manager to wrap. Returns: An async context manager function suitable for `FastAPI(lifespan=...)`. """ import contextlib import click def print_startup_message(): self._check_colab_or_spaces() print( click.style("INFO", fg="green") + ":\t Visit " + click.style("https://fastrtc.org/userguide/api/", fg="cyan") + " for WebRTC or Websocket API docs." ) @contextlib.asynccontextmanager async def new_lifespan(app: FastAPI): if lifespan is None: print_startup_message() yield else: async with lifespan(app): print_startup_message() yield return new_lifespan def _generate_default_ui( self, ui_args: UIArgs | None = None, ) -> Blocks: """ Generate the default Gradio UI based on mode, modality, and arguments. Constructs a `gradio.Blocks` interface with the appropriate WebRTC component and any specified additional input/output components. Args: ui_args: Optional dictionary containing UI customization arguments (title, subtitle, icon, etc.). Returns: A `gradio.Blocks` instance representing the generated UI. Raises: ValueError: If `additional_outputs` are provided without `additional_outputs_handler`. ValueError: If the combination of `mode` and `modality` is invalid or not supported for UI generation. """ ui_args = ui_args or {} same_components = [] additional_input_components = self.additional_input_components or [] additional_output_components = self.additional_output_components or [] if additional_output_components and not self.additional_outputs_handler: raise ValueError( "additional_outputs_handler must be provided if there are additional output components." ) if additional_input_components and additional_output_components: same_components = [ component for component in additional_input_components if component in additional_output_components ] for component in additional_output_components: if component in same_components: same_components.append(component) if self.modality == "video" and self.mode == "receive": with gr.Blocks() as demo: if not ui_args.get("hide_title"): gr.HTML( f"""

{ui_args.get("title", "Video Streaming (Powered by FastRTC ⚡️)")}

""" ) if ui_args.get("subtitle"): gr.Markdown( f"""
{ui_args.get("subtitle")}
""" ) with gr.Row(): with gr.Column(): if additional_input_components: for component in additional_input_components: component.render() button = gr.Button("Start Stream", variant="primary") with gr.Column(): output_video = WebRTC( label="Video Stream", rtc_configuration=self.rtc_configuration, track_constraints=self.track_constraints, mode="receive", modality="video", ) self.webrtc_component = output_video for component in additional_output_components: if component not in same_components: component.render() output_video.stream( fn=self.event_handler, inputs=self.additional_input_components, outputs=[output_video], trigger=button.click, time_limit=self.time_limit, concurrency_limit=self.concurrency_limit, # type: ignore send_input_on=ui_args.get("send_input_on", "change"), ) if additional_output_components: assert self.additional_outputs_handler output_video.on_additional_outputs( self.additional_outputs_handler, concurrency_limit=self.concurrency_limit_gradio, # type: ignore inputs=additional_output_components, outputs=additional_output_components, ) elif self.modality == "video" and self.mode == "send": with gr.Blocks() as demo: if not ui_args.get("hide_title"): gr.HTML( f"""

{ui_args.get("title", "Video Streaming (Powered by FastRTC ⚡️)")}

""" ) if ui_args.get("subtitle"): gr.Markdown( f"""
{ui_args.get("subtitle")}
""" ) with gr.Row(): if additional_input_components: with gr.Column(): for component in additional_input_components: component.render() with gr.Column(): output_video = WebRTC( label="Video Stream", rtc_configuration=self.rtc_configuration, track_constraints=self.track_constraints, mode="send", modality="video", ) self.webrtc_component = output_video for component in additional_output_components: if component not in same_components: component.render() output_video.stream( fn=self.event_handler, inputs=[output_video] + additional_input_components, outputs=[output_video], time_limit=self.time_limit, concurrency_limit=self.concurrency_limit, # type: ignore send_input_on=ui_args.get("send_input_on", "change"), ) if additional_output_components: assert self.additional_outputs_handler output_video.on_additional_outputs( self.additional_outputs_handler, concurrency_limit=self.concurrency_limit_gradio, # type: ignore inputs=additional_output_components, outputs=additional_output_components, ) elif self.modality == "video" and self.mode == "send-receive": css = """.my-group {max-width: 600px !important; max-height: 600 !important;} .my-column {display: flex !important; justify-content: center !important; align-items: center !important};""" with gr.Blocks(css=css) as demo: gr.HTML( f"""

{ui_args.get("title", "Video Streaming (Powered by FastRTC ⚡️)")}

""" ) if ui_args.get("subtitle"): gr.Markdown( f"""
{ui_args.get("subtitle")}
""" ) with gr.Column(elem_classes=["my-column"]): with gr.Group(elem_classes=["my-group"]): image = WebRTC( label="Stream", rtc_configuration=self.rtc_configuration, track_constraints=self.track_constraints, mode="send-receive", modality="video", ) for component in additional_input_components: component.render() if additional_output_components: with gr.Column(): for component in additional_output_components: if component not in same_components: component.render() self.webrtc_component = image image.stream( fn=self.event_handler, inputs=[image] + additional_input_components, outputs=[image], time_limit=self.time_limit, concurrency_limit=self.concurrency_limit, # type: ignore send_input_on=ui_args.get("send_input_on", "change"), ) if additional_output_components: assert self.additional_outputs_handler image.on_additional_outputs( self.additional_outputs_handler, inputs=additional_output_components, outputs=additional_output_components, concurrency_limit=self.concurrency_limit_gradio, # type: ignore ) elif self.modality == "audio" and self.mode == "receive": with gr.Blocks() as demo: if not ui_args.get("hide_title"): gr.HTML( f"""

{ui_args.get("title", "Audio Streaming (Powered by FastRTC ⚡️)")}

""" ) if ui_args.get("subtitle"): gr.Markdown( f"""
{ui_args.get("subtitle")}
""" ) with gr.Row(): with gr.Column(): for component in additional_input_components: component.render() button = gr.Button("Start Stream", variant="primary") with gr.Column(): output_video = WebRTC( label="Audio Stream", rtc_configuration=self.rtc_configuration, track_constraints=self.track_constraints, mode="receive", modality="audio", icon=ui_args.get("icon"), icon_button_color=ui_args.get("icon_button_color"), pulse_color=ui_args.get("pulse_color"), icon_radius=ui_args.get("icon_radius"), ) self.webrtc_component = output_video for component in additional_output_components: if component not in same_components: component.render() output_video.stream( fn=self.event_handler, inputs=self.additional_input_components, outputs=[output_video], trigger=button.click, time_limit=self.time_limit, concurrency_limit=self.concurrency_limit, # type: ignore send_input_on=ui_args.get("send_input_on", "change"), ) if additional_output_components: assert self.additional_outputs_handler output_video.on_additional_outputs( self.additional_outputs_handler, inputs=additional_output_components, outputs=additional_output_components, concurrency_limit=self.concurrency_limit_gradio, # type: ignore ) elif self.modality == "audio" and self.mode == "send": with gr.Blocks() as demo: if not ui_args.get("hide_title"): gr.HTML( f"""

{ui_args.get("title", "Audio Streaming (Powered by FastRTC ⚡️)")}

""" ) if ui_args.get("subtitle"): gr.Markdown( f"""
{ui_args.get("subtitle")}
""" ) with gr.Row(): with gr.Column(): with gr.Group(): image = WebRTC( label="Stream", rtc_configuration=self.rtc_configuration, track_constraints=self.track_constraints, mode="send", modality="audio", icon=ui_args.get("icon"), icon_button_color=ui_args.get("icon_button_color"), pulse_color=ui_args.get("pulse_color"), icon_radius=ui_args.get("icon_radius"), ) self.webrtc_component = image for component in additional_input_components: if component not in same_components: component.render() if additional_output_components: with gr.Column(): for component in additional_output_components: component.render() image.stream( fn=self.event_handler, inputs=[image] + additional_input_components, outputs=[image], time_limit=self.time_limit, concurrency_limit=self.concurrency_limit, # type: ignore send_input_on=ui_args.get("send_input_on", "change"), ) if additional_output_components: assert self.additional_outputs_handler image.on_additional_outputs( self.additional_outputs_handler, inputs=additional_output_components, outputs=additional_output_components, concurrency_limit=self.concurrency_limit_gradio, # type: ignore ) elif self.modality == "audio" and self.mode == "send-receive": with gr.Blocks() as demo: if not ui_args.get("hide_title"): gr.HTML( f"""

{ui_args.get("title", "Audio Streaming (Powered by FastRTC ⚡️)")}

""" ) if ui_args.get("subtitle"): gr.Markdown( f"""
{ui_args.get("subtitle")}
""" ) with gr.Row(): with gr.Column(): with gr.Group(): image = WebRTC( label="Stream", rtc_configuration=self.rtc_configuration, track_constraints=self.track_constraints, mode="send-receive", modality="audio", icon=ui_args.get("icon"), icon_button_color=ui_args.get("icon_button_color"), pulse_color=ui_args.get("pulse_color"), icon_radius=ui_args.get("icon_radius"), ) self.webrtc_component = image for component in additional_input_components: if component not in same_components: component.render() if additional_output_components: with gr.Column(): for component in additional_output_components: component.render() image.stream( fn=self.event_handler, inputs=[image] + additional_input_components, outputs=[image], time_limit=self.time_limit, concurrency_limit=self.concurrency_limit, # type: ignore send_input_on=ui_args.get("send_input_on", "change"), ) if additional_output_components: assert self.additional_outputs_handler image.on_additional_outputs( self.additional_outputs_handler, inputs=additional_output_components, outputs=additional_output_components, concurrency_limit=self.concurrency_limit_gradio, # type: ignore ) elif self.modality == "audio-video" and self.mode == "send-receive": css = """.my-group {max-width: 600px !important; max-height: 600 !important;} .my-column {display: flex !important; justify-content: center !important; align-items: center !important};""" with gr.Blocks(css=css) as demo: if not ui_args.get("hide_title"): gr.HTML( f"""

{ui_args.get("title", "Audio Video Streaming (Powered by FastRTC ⚡️)")}

""" ) if ui_args.get("subtitle"): gr.Markdown( f"""
{ui_args.get("subtitle")}
""" ) with gr.Row(): with gr.Column(elem_classes=["my-column"]): with gr.Group(elem_classes=["my-group"]): image = WebRTC( label="Stream", rtc_configuration=self.rtc_configuration, track_constraints=self.track_constraints, mode="send-receive", modality="audio-video", icon=ui_args.get("icon"), icon_button_color=ui_args.get("icon_button_color"), pulse_color=ui_args.get("pulse_color"), icon_radius=ui_args.get("icon_radius"), ) self.webrtc_component = image for component in additional_input_components: if component not in same_components: component.render() if additional_output_components: with gr.Column(): for component in additional_output_components: component.render() image.stream( fn=self.event_handler, inputs=[image] + additional_input_components, outputs=[image], time_limit=self.time_limit, concurrency_limit=self.concurrency_limit, # type: ignore send_input_on=ui_args.get("send_input_on", "change"), ) if additional_output_components: assert self.additional_outputs_handler image.on_additional_outputs( self.additional_outputs_handler, inputs=additional_output_components, outputs=additional_output_components, concurrency_limit=self.concurrency_limit_gradio, # type: ignore ) else: raise ValueError(f"Invalid modality: {self.modality} and mode: {self.mode}") return demo @property def ui(self) -> Blocks: """ Get the Gradio Blocks UI instance associated with this stream. Returns: The `gradio.Blocks` UI instance. """ return self._ui @ui.setter def ui(self, blocks: Blocks): """ Set a custom Gradio Blocks UI for this stream. Args: blocks: The `gradio.Blocks` instance to use as the UI. """ self._ui = blocks async def offer(self, body: Body): """ Handle an incoming WebRTC offer via HTTP POST. Processes the SDP offer and ICE candidates from the client to establish a WebRTC connection. Args: body: A Pydantic model containing the SDP offer, optional ICE candidate, type ('offer'), and a unique WebRTC ID. Returns: A dictionary containing the SDP answer generated by the server. """ return await self.handle_offer( body.model_dump(), set_outputs=self.set_additional_outputs(body.webrtc_id) ) async def get_rtc_configuration(self): if inspect.isfunction(self.rtc_configuration): if inspect.iscoroutinefunction(self.rtc_configuration): return await self.rtc_configuration() else: return anyio.to_thread.run_sync(self.rtc_configuration) # type: ignore else: return self.rtc_configuration async def handle_incoming_call(self, request: Request): """ Handle incoming telephone calls (e.g., via Twilio). Generates TwiML instructions to connect the incoming call to the WebSocket handler (`/telephone/handler`) for audio streaming. Args: request: The FastAPI Request object for the incoming call webhook. Returns: An HTMLResponse containing the TwiML instructions as XML. """ from twilio.twiml.voice_response import Connect, VoiceResponse response = VoiceResponse() response.say("Connecting to the AI assistant.") connect = Connect() path = request.url.path.removesuffix("/telephone/incoming") connect.stream(url=f"wss://{request.url.hostname}{path}/telephone/handler") response.append(connect) response.say("The call has been disconnected.") return HTMLResponse(content=str(response), media_type="application/xml") async def telephone_handler(self, websocket: WebSocket): """ The websocket endpoint for streaming audio over Twilio phone. Args: websocket: The incoming WebSocket connection object. """ handler = cast(StreamHandlerImpl, self.event_handler.copy()) # type: ignore handler.phone_mode = True async def set_handler(s: str, a: WebSocketHandler): if len(self.connections) >= self.concurrency_limit: # type: ignore await cast(WebSocket, a.websocket).send_json( { "status": "failed", "meta": { "error": "concurrency_limit_reached", "limit": self.concurrency_limit, }, } ) await websocket.close() return ws = WebSocketHandler( handler, set_handler, lambda s: None, lambda s: lambda a: None ) await ws.handle_websocket(websocket) async def websocket_offer(self, websocket: WebSocket): """ Handle WebRTC signaling over a WebSocket connection. Provides an alternative to the HTTP POST `/webrtc/offer` endpoint for exchanging SDP offers/answers and ICE candidates via WebSocket messages. Args: websocket: The incoming WebSocket connection object. """ handler = cast(StreamHandlerImpl, self.event_handler.copy()) # type: ignore handler.phone_mode = False async def set_handler(s: str, a: WebSocketHandler): if len(self.connections) >= self.concurrency_limit: # type: ignore await cast(WebSocket, a.websocket).send_json( { "status": "failed", "meta": { "error": "concurrency_limit_reached", "limit": self.concurrency_limit, }, } ) await websocket.close() return self.connections[s] = [a] # type: ignore def clean_up(s): self.clean_up(s) ws = WebSocketHandler( handler, set_handler, clean_up, lambda s: self.set_additional_outputs(s) ) await ws.handle_websocket(websocket) def fastphone( self, token: str | None = None, host: str = "127.0.0.1", port: int = 8000, **kwargs, ): """ Launch the FastPhone service for telephone integration. Starts a local FastAPI server, mounts the stream, creates a public tunnel (using Gradio's tunneling), registers the tunnel URL with the FastPhone backend service, and prints the assigned phone number and access code. This allows users to call the phone number and interact with the stream handler. Args: token: Optional Hugging Face Hub token for authentication with the FastPhone service. If None, attempts to find one automatically. host: The local host address to bind the server to. port: The local port to bind the server to. **kwargs: Additional keyword arguments passed to `uvicorn.run`. Raises: httpx.HTTPStatusError: If registration with the FastPhone service fails. RuntimeError: If running in Colab/Spaces without `rtc_configuration`. """ import atexit import inspect import secrets import threading import time import urllib.parse import click import httpx import uvicorn from gradio.networking import setup_tunnel from gradio.tunneling import CURRENT_TUNNELS from huggingface_hub import get_token app = FastAPI() self.mount(app) t = threading.Thread( target=uvicorn.run, args=(app,), kwargs={"host": host, "port": port, **kwargs}, ) t.start() # Check if setup_tunnel accepts share_server_tls_certificate parameter setup_tunnel_params = inspect.signature(setup_tunnel).parameters tunnel_kwargs = { "local_host": host, "local_port": port, "share_token": secrets.token_urlsafe(32), "share_server_address": None, } if "share_server_tls_certificate" in setup_tunnel_params: tunnel_kwargs["share_server_tls_certificate"] = None url = setup_tunnel(**tunnel_kwargs) host = urllib.parse.urlparse(url).netloc URL = "https://api.fastrtc.org" try: r = httpx.post( URL + "/register", json={"url": host}, headers={"Authorization": token or get_token() or ""}, ) r.raise_for_status() except Exception: URL = "https://fastrtc-fastphone.hf.space" r = httpx.post( URL + "/register", json={"url": host}, headers={"Authorization": token or get_token() or ""}, ) r.raise_for_status() if r.status_code == 202: print( click.style("INFO", fg="orange") + ":\t You have " + "run out of your quota" ) return data = r.json() code = f"{data['code']}" phone_number = data["phone"] reset_date = data["reset_date"] print( click.style("INFO", fg="green") + ":\t Your FastPhone is now live! Call " + click.style(phone_number, fg="cyan") + " and use code " + click.style(code, fg="cyan") + " to connect to your stream." ) minutes = str(int(data["time_remaining"] // 60)).zfill(2) seconds = str(int(data["time_remaining"] % 60)).zfill(2) print( click.style("INFO", fg="green") + ":\t You have " + click.style(f"{minutes}:{seconds}", fg="cyan") + " minutes remaining in your quota (Resetting on " + click.style(f"{reset_date}", fg="cyan") + ")" ) print( click.style("INFO", fg="green") + ":\t Visit " + click.style( "https://fastrtc.org/userguide/audio/#telephone-integration", fg="cyan", ) + " for information on making your handler compatible with phone usage." ) def unregister(): httpx.post( URL + "/unregister", json={"url": host, "code": code}, headers={"Authorization": token or get_token() or ""}, ) atexit.register(unregister) try: while True: time.sleep(0.1) except (KeyboardInterrupt, OSError): print( click.style("INFO", fg="green") + ":\t Keyboard interruption in main thread... closing server." ) unregister() t.join(timeout=5) for tunnel in CURRENT_TUNNELS: tunnel.kill() ``` ## /backend/fastrtc/templates/component/assets/worker-lPYB70QI.js ```js path="/backend/fastrtc/templates/component/assets/worker-lPYB70QI.js" (function(){"use strict";const R="https://unpkg.com/@ffmpeg/core@0.12.6/dist/umd/ffmpeg-core.js";var E;(function(t){t.LOAD="LOAD",t.EXEC="EXEC",t.WRITE_FILE="WRITE_FILE",t.READ_FILE="READ_FILE",t.DELETE_FILE="DELETE_FILE",t.RENAME="RENAME",t.CREATE_DIR="CREATE_DIR",t.LIST_DIR="LIST_DIR",t.DELETE_DIR="DELETE_DIR",t.ERROR="ERROR",t.DOWNLOAD="DOWNLOAD",t.PROGRESS="PROGRESS",t.LOG="LOG",t.MOUNT="MOUNT",t.UNMOUNT="UNMOUNT"})(E||(E={}));const a=new Error("unknown message type"),f=new Error("ffmpeg is not loaded, call `await ffmpeg.load()` first"),u=new Error("failed to import ffmpeg-core.js");let r;const O=async({coreURL:t,wasmURL:n,workerURL:e})=>{const o=!r;try{t||(t=R),importScripts(t)}catch{if(t||(t=R.replace("/umd/","/esm/")),self.createFFmpegCore=(await import(t)).default,!self.createFFmpegCore)throw u}const s=t,c=n||t.replace(/.js$/g,".wasm"),b=e||t.replace(/.js$/g,".worker.js");return r=await self.createFFmpegCore({mainScriptUrlOrBlob:`${s}#${btoa(JSON.stringify({wasmURL:c,workerURL:b}))}`}),r.setLogger(i=>self.postMessage({type:E.LOG,data:i})),r.setProgress(i=>self.postMessage({type:E.PROGRESS,data:i})),o},l=({args:t,timeout:n=-1})=>{r.setTimeout(n),r.exec(...t);const e=r.ret;return r.reset(),e},m=({path:t,data:n})=>(r.FS.writeFile(t,n),!0),D=({path:t,encoding:n})=>r.FS.readFile(t,{encoding:n}),S=({path:t})=>(r.FS.unlink(t),!0),I=({oldPath:t,newPath:n})=>(r.FS.rename(t,n),!0),L=({path:t})=>(r.FS.mkdir(t),!0),N=({path:t})=>{const n=r.FS.readdir(t),e=[];for(const o of n){const s=r.FS.stat(`${t}/${o}`),c=r.FS.isDir(s.mode);e.push({name:o,isDir:c})}return e},A=({path:t})=>(r.FS.rmdir(t),!0),w=({fsType:t,options:n,mountPoint:e})=>{const o=t,s=r.FS.filesystems[o];return s?(r.FS.mount(s,n,e),!0):!1},k=({mountPoint:t})=>(r.FS.unmount(t),!0);self.onmessage=async({data:{id:t,type:n,data:e}})=>{const o=[];let s;try{if(n!==E.LOAD&&!r)throw f;switch(n){case E.LOAD:s=await O(e);break;case E.EXEC:s=l(e);break;case E.WRITE_FILE:s=m(e);break;case E.READ_FILE:s=D(e);break;case E.DELETE_FILE:s=S(e);break;case E.RENAME:s=I(e);break;case E.CREATE_DIR:s=L(e);break;case E.LIST_DIR:s=N(e);break;case E.DELETE_DIR:s=A(e);break;case E.MOUNT:s=w(e);break;case E.UNMOUNT:s=k(e);break;default:throw a}}catch(c){self.postMessage({id:t,type:E.ERROR,data:c.toString()});return}s instanceof Uint8Array&&o.push(s.buffer),self.postMessage({id:t,type:n,data:s},o)}})(); ``` The content has been capped at 50000 tokens, and files over NaN bytes have been omitted. The user could consider applying other filters to refine the result. The better and more specific the context, the better the LLM can follow instructions. If the context seems verbose, the user can refine the filter using uithub. Thank you for using https://uithub.com - Perfect LLM context for any GitHub repo.