```
├── .github/
├── workflows/
├── docs.yml
├── tests-frontend.yml
├── tests.yml
├── .gitignore
├── CNAME
├── LICENSE
├── README.md
├── backend/
├── fastrtc/
├── __init__.py
├── credentials.py
├── pause_detection/
├── __init__.py
├── protocol.py
├── silero.py
├── py.typed
├── reply_on_pause.py
├── reply_on_stopwords.py
├── speech_to_text/
├── __init__.py
├── stt_.py
├── test_file.wav
├── stream.py
├── templates/
├── component/
├── assets/
├── worker-lPYB70QI.js
├── index.js
```
## /.github/workflows/docs.yml
```yml path="/.github/workflows/docs.yml"
name: docs
on:
push:
branches:
- main
pull_request:
branches:
- main
permissions:
contents: write
pull-requests: write
deployments: write
pages: write
jobs:
deploy:
runs-on: ubuntu-latest
if: github.event_name == 'push' || (github.event_name == 'pull_request' && github.event.pull_request.head.repo.fork == false)
steps:
- uses: actions/checkout@v4
- name: Configure Git Credentials
run: |
git config user.name github-actions[bot]
git config user.email 41898282+github-actions[bot]@users.noreply.github.com
- uses: actions/setup-python@v5
with:
python-version: 3.x
- run: echo "cache_id=$(date --utc '+%V')" >> $GITHUB_ENV
- uses: actions/cache@v4
with:
key: mkdocs-material-${{ env.cache_id }}
path: .cache
restore-keys: |
mkdocs-material-
- run: pip install mkdocs-material mkdocs-llmstxt==0.1.0
- name: Build docs
run: mkdocs build
- name: Deploy to GH Pages (main)
if: github.event_name == 'push'
run: mkdocs gh-deploy --force
- name: Deploy PR Preview
if: github.event_name == 'pull_request'
uses: rossjrw/pr-preview-action@v1
with:
source-dir: ./site
preview-branch: gh-pages
umbrella-dir: pr-preview
action: auto
```
## /.github/workflows/tests-frontend.yml
```yml path="/.github/workflows/tests-frontend.yml"
name: tests
on: [push, pull_request]
jobs:
prettier:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-node@v4
with:
node-version: 18
- name: Run prettier
run: |
cd frontend
npm install
npx prettier --check .
```
## /.github/workflows/tests.yml
```yml path="/.github/workflows/tests.yml"
name: tests
on: [push, pull_request]
jobs:
lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.10'
- name: Run linters
run: |
pip install ruff pyright
pip install -e .[dev]
ruff check .
ruff format --check --diff .
pyright
test:
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest]
python:
- '3.10'
- '3.13'
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python }}
- name: Run tests
run: |
python -m pip install -U pip
pip install '.[dev, tts]'
python -m pytest --capture=no
shell: bash
```
## /.gitignore
```gitignore path="/.gitignore"
.eggs/
dist/
*.pyc
__pycache__/
*.py[cod]
*$py.class
__tmp/*
*.pyi
.mypycache
.ruff_cache
node_modules
demo/MobileNetSSD_deploy.caffemodel
demo/MobileNetSSD_deploy.prototxt.txt
demo/scratch
.gradio
.vscode
.DS_Store
.venv*
.env
```
## /CNAME
``` path="/CNAME"
fastrtc.org
```
## /LICENSE
``` path="/LICENSE"
MIT License
Copyright (c) 2024 Freddy Boulton
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
```
## /README.md
FastRTC
The Real-Time Communication Library for Python.
Turn any python function into a real-time audio and video stream over WebRTC or WebSockets.
## Installation
```bash
pip install fastrtc
```
to use built-in pause detection (see [ReplyOnPause](https://fastrtc.org/userguide/audio/#reply-on-pause)), and text to speech (see [Text To Speech](https://fastrtc.org/userguide/audio/#text-to-speech)), install the `vad` and `tts` extras:
```bash
pip install "fastrtc[vad, tts]"
```
## Key Features
- 🗣️ Automatic Voice Detection and Turn Taking built-in, only worry about the logic for responding to the user.
- 💻 Automatic UI - Use the `.ui.launch()` method to launch the webRTC-enabled built-in Gradio UI.
- 🔌 Automatic WebRTC Support - Use the `.mount(app)` method to mount the stream on a FastAPI app and get a webRTC endpoint for your own frontend!
- ⚡️ Websocket Support - Use the `.mount(app)` method to mount the stream on a FastAPI app and get a websocket endpoint for your own frontend!
- 📞 Automatic Telephone Support - Use the `fastphone()` method of the stream to launch the application and get a free temporary phone number!
- 🤖 Completely customizable backend - A `Stream` can easily be mounted on a FastAPI app so you can easily extend it to fit your production application. See the [Talk To Claude](https://huggingface.co/spaces/fastrtc/talk-to-claude) demo for an example on how to serve a custom JS frontend.
## Docs
[https://fastrtc.org](https://fastrtc.org)
## Examples
See the [Cookbook](https://fastrtc.org/cookbook/) for examples of how to use the library.
🗣️👀 Gemini Audio Video Chat
Stream BOTH your webcam video and audio feeds to Google Gemini. You can also upload images to augment your conversation!
Demo |
Code
|
🗣️ Google Gemini Real Time Voice API
Talk to Gemini in real time using Google's voice API.
Demo |
Code
|
🗣️ OpenAI Real Time Voice API
Talk to ChatGPT in real time using OpenAI's voice API.
Demo |
Code
|
🤖 Hello Computer
Say computer before asking your question!
Demo |
Code
|
🤖 Llama Code Editor
Create and edit HTML pages with just your voice! Powered by SambaNova systems.
Demo |
Code
|
🗣️ Talk to Claude
Use the Anthropic and Play.Ht APIs to have an audio conversation with Claude.
Demo |
Code
|
🎵 Whisper Transcription
Have whisper transcribe your speech in real time!
Demo |
Code
|
📷 Yolov10 Object Detection
Run the Yolov10 model on a user webcam stream in real time!
Demo |
Code
|
🗣️ Kyutai Moshi
Kyutai's moshi is a novel speech-to-speech model for modeling human conversations.
Demo |
Code
|
🗣️ Hello Llama: Stop Word Detection
A code editor built with Llama 3.3 70b that is triggered by the phrase "Hello Llama". Build a Siri-like coding assistant in 100 lines of code!
Demo |
Code
|
## Usage
This is an shortened version of the official [usage guide](https://freddyaboulton.github.io/gradio-webrtc/user-guide/).
- `.ui.launch()`: Launch a built-in UI for easily testing and sharing your stream. Built with [Gradio](https://www.gradio.app/).
- `.fastphone()`: Get a free temporary phone number to call into your stream. Hugging Face token required.
- `.mount(app)`: Mount the stream on a [FastAPI](https://fastapi.tiangolo.com/) app. Perfect for integrating with your already existing production system.
## Quickstart
### Echo Audio
```python
from fastrtc import Stream, ReplyOnPause
import numpy as np
def echo(audio: tuple[int, np.ndarray]):
# The function will be passed the audio until the user pauses
# Implement any iterator that yields audio
# See "LLM Voice Chat" for a more complete example
yield audio
stream = Stream(
handler=ReplyOnPause(echo),
modality="audio",
mode="send-receive",
)
```
### LLM Voice Chat
```py
from fastrtc import (
ReplyOnPause, AdditionalOutputs, Stream,
audio_to_bytes, aggregate_bytes_to_16bit
)
import gradio as gr
from groq import Groq
import anthropic
from elevenlabs import ElevenLabs
groq_client = Groq()
claude_client = anthropic.Anthropic()
tts_client = ElevenLabs()
# See "Talk to Claude" in Cookbook for an example of how to keep
# track of the chat history.
def response(
audio: tuple[int, np.ndarray],
):
prompt = groq_client.audio.transcriptions.create(
file=("audio-file.mp3", audio_to_bytes(audio)),
model="whisper-large-v3-turbo",
response_format="verbose_json",
).text
response = claude_client.messages.create(
model="claude-3-5-haiku-20241022",
max_tokens=512,
messages=[{"role": "user", "content": prompt}],
)
response_text = " ".join(
block.text
for block in response.content
if getattr(block, "type", None) == "text"
)
iterator = tts_client.text_to_speech.convert_as_stream(
text=response_text,
voice_id="JBFqnCBsd6RMkjVDRZzb",
model_id="eleven_multilingual_v2",
output_format="pcm_24000"
)
for chunk in aggregate_bytes_to_16bit(iterator):
audio_array = np.frombuffer(chunk, dtype=np.int16).reshape(1, -1)
yield (24000, audio_array)
stream = Stream(
modality="audio",
mode="send-receive",
handler=ReplyOnPause(response),
)
```
### Webcam Stream
```python
from fastrtc import Stream
import numpy as np
def flip_vertically(image):
return np.flip(image, axis=0)
stream = Stream(
handler=flip_vertically,
modality="video",
mode="send-receive",
)
```
### Object Detection
```python
from fastrtc import Stream
import gradio as gr
import cv2
from huggingface_hub import hf_hub_download
from .inference import YOLOv10
model_file = hf_hub_download(
repo_id="onnx-community/yolov10n", filename="onnx/model.onnx"
)
# git clone https://huggingface.co/spaces/fastrtc/object-detection
# for YOLOv10 implementation
model = YOLOv10(model_file)
def detection(image, conf_threshold=0.3):
image = cv2.resize(image, (model.input_width, model.input_height))
new_image = model.detect_objects(image, conf_threshold)
return cv2.resize(new_image, (500, 500))
stream = Stream(
handler=detection,
modality="video",
mode="send-receive",
additional_inputs=[
gr.Slider(minimum=0, maximum=1, step=0.01, value=0.3)
]
)
```
## Running the Stream
Run:
### Gradio
```py
stream.ui.launch()
```
### Telephone (Audio Only)
```py
stream.fastphone()
```
### FastAPI
```py
app = FastAPI()
stream.mount(app)
# Optional: Add routes
@app.get("/")
async def _():
return HTMLResponse(content=open("index.html").read())
# uvicorn app:app --host 0.0.0.0 --port 8000
```
## /backend/fastrtc/__init__.py
```py path="/backend/fastrtc/__init__.py"
from .credentials import (
get_cloudflare_turn_credentials,
get_cloudflare_turn_credentials_async,
get_hf_turn_credentials,
get_hf_turn_credentials_async,
get_turn_credentials,
get_turn_credentials_async,
get_twilio_turn_credentials,
)
from .pause_detection import (
ModelOptions,
PauseDetectionModel,
SileroVadOptions,
get_silero_model,
)
from .reply_on_pause import AlgoOptions, ReplyOnPause
from .reply_on_stopwords import ReplyOnStopWords
from .speech_to_text import MoonshineSTT, get_stt_model
from .stream import Stream, UIArgs
from .text_to_speech import (
CartesiaTTSOptions,
KokoroTTSOptions,
get_tts_model,
)
from .tracks import (
AsyncAudioVideoStreamHandler,
AsyncStreamHandler,
AudioEmitType,
AudioVideoStreamHandler,
StreamHandler,
VideoEmitType,
VideoStreamHandler,
)
from .utils import (
AdditionalOutputs,
CloseStream,
Warning,
WebRTCError,
aggregate_bytes_to_16bit,
async_aggregate_bytes_to_16bit,
audio_to_bytes,
audio_to_file,
audio_to_float32,
audio_to_int16,
get_current_context,
wait_for_item,
)
from .webrtc import (
WebRTC,
)
__all__ = [
"AsyncStreamHandler",
"AudioVideoStreamHandler",
"AudioEmitType",
"AsyncAudioVideoStreamHandler",
"AlgoOptions",
"AdditionalOutputs",
"aggregate_bytes_to_16bit",
"async_aggregate_bytes_to_16bit",
"audio_to_bytes",
"audio_to_file",
"audio_to_float32",
"audio_to_int16",
"get_hf_turn_credentials",
"get_twilio_turn_credentials",
"get_turn_credentials",
"ReplyOnPause",
"ReplyOnStopWords",
"SileroVadOptions",
"get_stt_model",
"MoonshineSTT",
"StreamHandler",
"Stream",
"VideoEmitType",
"WebRTC",
"WebRTCError",
"Warning",
"get_tts_model",
"KokoroTTSOptions",
"get_cloudflare_turn_credentials_async",
"get_hf_turn_credentials_async",
"get_turn_credentials_async",
"get_cloudflare_turn_credentials",
"wait_for_item",
"UIArgs",
"ModelOptions",
"PauseDetectionModel",
"get_silero_model",
"SileroVadOptions",
"VideoStreamHandler",
"CloseStream",
"get_current_context",
"CartesiaTTSOptions",
]
```
## /backend/fastrtc/credentials.py
```py path="/backend/fastrtc/credentials.py"
import os
import warnings
from typing import Literal
import httpx
CLOUDFLARE_FASTRTC_TURN_URL = "https://turn.fastrtc.org/credentials"
async_httpx_client = httpx.AsyncClient()
def _format_response(response):
if response.is_success:
return response.json()
else:
raise Exception(
f"Failed to get TURN credentials: {response.status_code} {response.text}"
)
def get_hf_turn_credentials(token=None, ttl=600):
"""Retrieves TURN credentials from Hugging Face (deprecated).
This function fetches TURN server credentials using a Hugging Face token.
It is deprecated and `get_cloudflare_turn_credentials` should be used instead.
Args:
token (str, optional): Hugging Face API token. Defaults to None, in which
case the HF_TOKEN environment variable is used.
ttl (int, optional): Time-to-live for the credentials in seconds.
Defaults to 600.
Returns:
dict: A dictionary containing the TURN credentials.
Raises:
ValueError: If no token is provided and the HF_TOKEN environment variable
is not set.
Exception: If the request to the TURN server fails.
"""
warnings.warn(
"get_hf_turn_credentials is deprecated. Use get_cloudflare_turn_credentials instead.",
UserWarning,
)
if token is None:
token = os.getenv("HF_TOKEN")
if token is None:
raise ValueError(
"HF_TOKEN environment variable must be set or token must be provided to use get_hf_turn_credentials"
)
response = httpx.get(
CLOUDFLARE_FASTRTC_TURN_URL,
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
},
params={"ttl": ttl},
)
return _format_response(response)
async def get_hf_turn_credentials_async(
token=None, ttl=600, client: httpx.AsyncClient | None = None
):
"""Asynchronously retrieves TURN credentials from Hugging Face (deprecated).
This function asynchronously fetches TURN server credentials using a Hugging Face
token. It is deprecated and `get_cloudflare_turn_credentials_async` should be
used instead.
Args:
token (str, optional): Hugging Face API token. Defaults to None, in which
case the HF_TOKEN environment variable is used.
ttl (int, optional): Time-to-live for the credentials in seconds.
Defaults to 600.
client (httpx.AsyncClient | None, optional): An existing httpx async client
to use for the request. If None, a default client is used. Defaults to None.
Returns:
dict: A dictionary containing the TURN credentials.
Raises:
ValueError: If no token is provided and the HF_TOKEN environment variable
is not set.
Exception: If the request to the TURN server fails.
"""
warnings.warn(
"get_hf_turn_credentials_async is deprecated. Use get_cloudflare_turn_credentials_async instead.",
UserWarning,
)
if client is None:
client = async_httpx_client
if token is None:
token = os.getenv("HF_TOKEN")
if token is None:
raise ValueError(
"HF_TOKEN environment variable must be set or token must be provided to use get_hf_turn_credentials"
)
async with client:
response = await client.get(
"https://turn.fastrtc.org/credentials",
headers={"Authorization": f"Bearer {token}"},
params={"ttl": ttl},
)
return _format_response(response)
def get_cloudflare_turn_credentials(
turn_key_id=None, turn_key_api_token=None, hf_token=None, ttl=600
):
"""Retrieves TURN credentials from Cloudflare or Hugging Face.
Fetches TURN server credentials either directly from Cloudflare using API keys
or via the Hugging Face TURN endpoint using an HF token. The HF token method
takes precedence if provided.
Args:
turn_key_id (str, optional): Cloudflare TURN key ID. Defaults to None,
in which case the CLOUDFLARE_TURN_KEY_ID environment variable is used.
turn_key_api_token (str, optional): Cloudflare TURN key API token.
Defaults to None, in which case the CLOUDFLARE_TURN_KEY_API_TOKEN
environment variable is used.
hf_token (str, optional): Hugging Face API token. If provided, this method
is used instead of Cloudflare keys. Defaults to None, in which case
the HF_TOKEN environment variable is used.
ttl (int, optional): Time-to-live for the credentials in seconds.
Defaults to 600.
Returns:
dict: A dictionary containing the TURN credentials (ICE servers).
Raises:
ValueError: If neither HF token nor Cloudflare keys (either as arguments
or environment variables) are provided.
Exception: If the request to the credential server fails.
"""
if hf_token is None:
hf_token = os.getenv("HF_TOKEN")
if hf_token is not None:
return httpx.get(
CLOUDFLARE_FASTRTC_TURN_URL,
headers={"Authorization": f"Bearer {hf_token}"},
params={"ttl": ttl},
).json()
else:
if turn_key_id is None or turn_key_api_token is None:
turn_key_id = os.getenv("CLOUDFLARE_TURN_KEY_ID")
turn_key_api_token = os.getenv("CLOUDFLARE_TURN_KEY_API_TOKEN")
if turn_key_id is None or turn_key_api_token is None:
raise ValueError(
"HF_TOKEN or CLOUDFLARE_TURN_KEY_ID and CLOUDFLARE_TURN_KEY_API_TOKEN must be set to use get_cloudflare_turn_credentials_sync"
)
response = httpx.post(
f"https://rtc.live.cloudflare.com/v1/turn/keys/{turn_key_id}/credentials/generate-ice-servers",
headers={
"Authorization": f"Bearer {turn_key_api_token}",
"Content-Type": "application/json",
},
json={"ttl": ttl},
)
if response.is_success:
return response.json()
else:
raise Exception(
f"Failed to get TURN credentials: {response.status_code} {response.text}"
)
async def get_cloudflare_turn_credentials_async(
turn_key_id=None,
turn_key_api_token=None,
hf_token=None,
ttl=600,
client: httpx.AsyncClient | None = None,
):
"""Asynchronously retrieves TURN credentials from Cloudflare or Hugging Face.
Asynchronously fetches TURN server credentials either directly from Cloudflare
using API keys or via the Hugging Face TURN endpoint using an HF token. The HF
token method takes precedence if provided.
Args:
turn_key_id (str, optional): Cloudflare TURN key ID. Defaults to None,
in which case the CLOUDFLARE_TURN_KEY_ID environment variable is used.
turn_key_api_token (str, optional): Cloudflare TURN key API token.
Defaults to None, in which case the CLOUDFLARE_TURN_KEY_API_TOKEN
environment variable is used.
hf_token (str, optional): Hugging Face API token. If provided, this method
is used instead of Cloudflare keys. Defaults to None, in which case
the HF_TOKEN environment variable is used.
ttl (int, optional): Time-to-live for the credentials in seconds.
Defaults to 600.
client (httpx.AsyncClient | None, optional): An existing httpx async client
to use for the request. If None, a new client is created per request.
Defaults to None.
Returns:
dict: A dictionary containing the TURN credentials (ICE servers).
Raises:
ValueError: If neither HF token nor Cloudflare keys (either as arguments
or environment variables) are provided.
Exception: If the request to the credential server fails.
"""
if client is None:
client = async_httpx_client
if hf_token is None:
hf_token = os.getenv("HF_TOKEN", "").strip()
if hf_token is not None:
async with httpx.AsyncClient() as client:
response = await client.get(
CLOUDFLARE_FASTRTC_TURN_URL,
headers={"Authorization": f"Bearer {hf_token}"},
params={"ttl": ttl},
)
return _format_response(response)
else:
if turn_key_id is None or turn_key_api_token is None:
turn_key_id = os.getenv("CLOUDFLARE_TURN_KEY_ID")
turn_key_api_token = os.getenv("CLOUDFLARE_TURN_KEY_API_TOKEN")
if turn_key_id is None or turn_key_api_token is None:
raise ValueError(
"HF_TOKEN or CLOUDFLARE_TURN_KEY_ID and CLOUDFLARE_TURN_KEY_API_TOKEN must be set to use get_cloudflare_turn_credentials"
)
async with httpx.AsyncClient() as client:
response = await client.post(
f"https://rtc.live.cloudflare.com/v1/turn/keys/{turn_key_id}/credentials/generate-ice-servers",
headers={
"Authorization": f"Bearer {turn_key_api_token}",
"Content-Type": "application/json",
},
json={"ttl": ttl},
)
if response.is_success:
return response.json()
else:
raise Exception(
f"Failed to get TURN credentials: {response.status_code} {response.text}"
)
def get_twilio_turn_credentials(twilio_sid=None, twilio_token=None):
"""Retrieves TURN credentials from Twilio.
Uses the Twilio REST API to generate temporary TURN credentials. Requires
the `twilio` package to be installed.
Args:
twilio_sid (str, optional): Twilio Account SID. Defaults to None, in which
case the TWILIO_ACCOUNT_SID environment variable is used.
twilio_token (str, optional): Twilio Auth Token. Defaults to None, in which
case the TWILIO_AUTH_TOKEN environment variable is used.
Returns:
dict: A dictionary containing the TURN credentials formatted for WebRTC,
including 'iceServers' and 'iceTransportPolicy'.
Raises:
ImportError: If the `twilio` package is not installed.
ValueError: If Twilio credentials (SID and token) are not provided either
as arguments or environment variables.
TwilioRestException: If the Twilio API request fails.
"""
try:
from twilio.rest import Client
except ImportError:
raise ImportError("Please install twilio with `pip install twilio`")
if not twilio_sid and not twilio_token:
twilio_sid = os.environ.get("TWILIO_ACCOUNT_SID")
twilio_token = os.environ.get("TWILIO_AUTH_TOKEN")
client = Client(twilio_sid, twilio_token)
token = client.tokens.create()
return {
"iceServers": token.ice_servers,
"iceTransportPolicy": "relay",
}
def get_turn_credentials(
method: Literal["hf", "twilio", "cloudflare"] = "cloudflare", **kwargs
):
"""Retrieves TURN credentials from the specified provider.
Acts as a dispatcher function to call the appropriate credential retrieval
function based on the method specified.
Args:
method (Literal["hf", "twilio", "cloudflare"], optional): The provider
to use. 'hf' uses the deprecated Hugging Face endpoint. 'cloudflare'
uses either Cloudflare keys or the HF endpoint. 'twilio' uses the
Twilio API. Defaults to "cloudflare".
**kwargs: Additional keyword arguments passed directly to the underlying
provider-specific function (e.g., `token`, `ttl` for 'hf';
`twilio_sid`, `twilio_token` for 'twilio'; `turn_key_id`,
`turn_key_api_token`, `hf_token`, `ttl` for 'cloudflare').
Returns:
dict: A dictionary containing the TURN credentials from the chosen provider.
Raises:
ValueError: If an invalid method is specified.
Also raises exceptions from the underlying provider functions (see their
docstrings).
"""
if method == "hf":
warnings.warn(
"Method 'hf' is deprecated. Use 'cloudflare' instead.", UserWarning
)
# Ensure only relevant kwargs are passed
hf_kwargs = {k: v for k, v in kwargs.items() if k in ["token", "ttl"]}
return get_hf_turn_credentials(**hf_kwargs)
elif method == "cloudflare":
# Ensure only relevant kwargs are passed
cf_kwargs = {
k: v
for k, v in kwargs.items()
if k in ["turn_key_id", "turn_key_api_token", "hf_token", "ttl"]
}
return get_cloudflare_turn_credentials(**cf_kwargs)
elif method == "twilio":
# Ensure only relevant kwargs are passed
twilio_kwargs = {
k: v for k, v in kwargs.items() if k in ["twilio_sid", "twilio_token"]
}
return get_twilio_turn_credentials(**twilio_kwargs)
else:
raise ValueError("Invalid method. Must be 'hf', 'twilio', or 'cloudflare'")
async def get_turn_credentials_async(
method: Literal["hf", "twilio", "cloudflare"] = "cloudflare", **kwargs
):
"""Asynchronously retrieves TURN credentials from the specified provider.
Acts as an async dispatcher function to call the appropriate async credential
retrieval function based on the method specified.
Args:
method (Literal["hf", "twilio", "cloudflare"], optional): The provider
to use. 'hf' uses the deprecated Hugging Face endpoint. 'cloudflare'
uses either Cloudflare keys or the HF endpoint. 'twilio' is not
supported asynchronously by this function yet. Defaults to "cloudflare".
**kwargs: Additional keyword arguments passed directly to the underlying
provider-specific async function (e.g., `token`, `ttl`, `client` for 'hf';
`turn_key_id`, `turn_key_api_token`, `hf_token`, `ttl`, `client` for
'cloudflare').
Returns:
dict: A dictionary containing the TURN credentials from the chosen provider.
Raises:
ValueError: If an invalid or unsupported method is specified (currently
'twilio' is not supported asynchronously here).
NotImplementedError: If method 'twilio' is requested.
Also raises exceptions from the underlying provider functions (see their
docstrings).
"""
if method == "hf":
warnings.warn(
"Method 'hf' is deprecated. Use 'cloudflare' instead.", UserWarning
)
# Ensure only relevant kwargs are passed
hf_kwargs = {k: v for k, v in kwargs.items() if k in ["token", "ttl", "client"]}
return await get_hf_turn_credentials_async(**hf_kwargs)
elif method == "cloudflare":
# Ensure only relevant kwargs are passed
cf_kwargs = {
k: v
for k, v in kwargs.items()
if k in ["turn_key_id", "turn_key_api_token", "hf_token", "ttl", "client"]
}
return await get_cloudflare_turn_credentials_async(**cf_kwargs)
elif method == "twilio":
# Twilio client library doesn't have a standard async interface for this.
# You might need to run the sync version in an executor or use a different library.
raise NotImplementedError(
"Async retrieval for Twilio credentials is not implemented."
)
else:
raise ValueError("Invalid method. Must be 'hf', 'twilio', or 'cloudflare'")
```
## /backend/fastrtc/pause_detection/__init__.py
```py path="/backend/fastrtc/pause_detection/__init__.py"
from .protocol import ModelOptions, PauseDetectionModel
from .silero import SileroVADModel, SileroVadOptions, get_silero_model
__all__ = [
"SileroVADModel",
"SileroVadOptions",
"PauseDetectionModel",
"ModelOptions",
"get_silero_model",
]
```
## /backend/fastrtc/pause_detection/protocol.py
```py path="/backend/fastrtc/pause_detection/protocol.py"
from typing import Any, Protocol, TypeAlias
import numpy as np
from numpy.typing import NDArray
from ..utils import AudioChunk
ModelOptions: TypeAlias = Any
class PauseDetectionModel(Protocol):
def vad(
self,
audio: tuple[int, NDArray[np.int16] | NDArray[np.float32]],
options: ModelOptions,
) -> tuple[float, list[AudioChunk]]: ...
def warmup(
self,
) -> None: ...
```
## /backend/fastrtc/pause_detection/silero.py
```py path="/backend/fastrtc/pause_detection/silero.py"
import logging
import warnings
from dataclasses import dataclass
from functools import lru_cache
import click
import numpy as np
from huggingface_hub import hf_hub_download
from numpy.typing import NDArray
from ..utils import AudioChunk, audio_to_float32
from .protocol import PauseDetectionModel
logger = logging.getLogger(__name__)
# The code below is adapted from https://github.com/snakers4/silero-vad.
# The code below is adapted from https://github.com/gpt-omni/mini-omni/blob/main/utils/vad.py
@lru_cache
def get_silero_model() -> PauseDetectionModel:
"""Returns the VAD model instance and warms it up with dummy data."""
# Warm up the model with dummy data
try:
import importlib.util
mod = importlib.util.find_spec("onnxruntime")
if mod is None:
raise RuntimeError("Install fastrtc[vad] to use ReplyOnPause")
except (ValueError, ModuleNotFoundError):
raise RuntimeError("Install fastrtc[vad] to use ReplyOnPause")
model = SileroVADModel()
print(click.style("INFO", fg="green") + ":\t Warming up VAD model.")
model.warmup()
print(click.style("INFO", fg="green") + ":\t VAD model warmed up.")
return model
@dataclass
class SileroVadOptions:
"""VAD options.
Attributes:
threshold: Speech threshold. Silero VAD outputs speech probabilities for each audio chunk,
probabilities ABOVE this value are considered as SPEECH. It is better to tune this
parameter for each dataset separately, but "lazy" 0.5 is pretty good for most datasets.
min_speech_duration_ms: Final speech chunks shorter min_speech_duration_ms are thrown out.
max_speech_duration_s: Maximum duration of speech chunks in seconds. Chunks longer
than max_speech_duration_s will be split at the timestamp of the last silence that
lasts more than 100ms (if any), to prevent aggressive cutting. Otherwise, they will be
split aggressively just before max_speech_duration_s.
min_silence_duration_ms: In the end of each speech chunk wait for min_silence_duration_ms
before separating it
window_size_samples: Audio chunks of window_size_samples size are fed to the silero VAD model.
WARNING! Silero VAD models were trained using 512, 1024, 1536 samples for 16000 sample rate.
Values other than these may affect model performance!!
speech_pad_ms: Final speech chunks are padded by speech_pad_ms each side
speech_duration: If the length of the speech is less than this value, a pause will be detected.
"""
threshold: float = 0.5
min_speech_duration_ms: int = 250
max_speech_duration_s: float = float("inf")
min_silence_duration_ms: int = 2000
window_size_samples: int = 1024
speech_pad_ms: int = 400
class SileroVADModel:
@staticmethod
def download_model() -> str:
return hf_hub_download(
repo_id="freddyaboulton/silero-vad", filename="silero_vad.onnx"
)
def __init__(self):
try:
import onnxruntime
except ImportError as e:
raise RuntimeError(
"Applying the VAD filter requires the onnxruntime package"
) from e
path = self.download_model()
opts = onnxruntime.SessionOptions()
opts.inter_op_num_threads = 1
opts.intra_op_num_threads = 1
opts.log_severity_level = 4
self.session = onnxruntime.InferenceSession(
path,
providers=["CPUExecutionProvider"],
sess_options=opts,
)
def get_initial_state(self, batch_size: int):
h = np.zeros((2, batch_size, 64), dtype=np.float32)
c = np.zeros((2, batch_size, 64), dtype=np.float32)
return h, c
@staticmethod
def collect_chunks(audio: np.ndarray, chunks: list[AudioChunk]) -> np.ndarray:
"""Collects and concatenates audio chunks."""
if not chunks:
return np.array([], dtype=np.float32)
return np.concatenate(
[audio[chunk["start"] : chunk["end"]] for chunk in chunks]
)
def get_speech_timestamps(
self,
audio: np.ndarray,
vad_options: SileroVadOptions,
**kwargs,
) -> list[AudioChunk]:
"""This method is used for splitting long audios into speech chunks using silero VAD.
Args:
audio: One dimensional float array.
vad_options: Options for VAD processing.
kwargs: VAD options passed as keyword arguments for backward compatibility.
Returns:
List of dicts containing begin and end samples of each speech chunk.
"""
threshold = vad_options.threshold
min_speech_duration_ms = vad_options.min_speech_duration_ms
max_speech_duration_s = vad_options.max_speech_duration_s
min_silence_duration_ms = vad_options.min_silence_duration_ms
window_size_samples = vad_options.window_size_samples
speech_pad_ms = vad_options.speech_pad_ms
if window_size_samples not in [512, 1024, 1536]:
warnings.warn(
"Unusual window_size_samples! Supported window_size_samples:\n"
" - [512, 1024, 1536] for 16000 sampling_rate"
)
sampling_rate = 16000
min_speech_samples = sampling_rate * min_speech_duration_ms / 1000
speech_pad_samples = sampling_rate * speech_pad_ms / 1000
max_speech_samples = (
sampling_rate * max_speech_duration_s
- window_size_samples
- 2 * speech_pad_samples
)
min_silence_samples = sampling_rate * min_silence_duration_ms / 1000
min_silence_samples_at_max_speech = sampling_rate * 98 / 1000
audio_length_samples = len(audio)
state = self.get_initial_state(batch_size=1)
speech_probs = []
for current_start_sample in range(0, audio_length_samples, window_size_samples):
chunk = audio[
current_start_sample : current_start_sample + window_size_samples
]
if len(chunk) < window_size_samples:
chunk = np.pad(chunk, (0, int(window_size_samples - len(chunk))))
speech_prob, state = self(chunk, state, sampling_rate)
speech_probs.append(speech_prob)
triggered = False
speeches = []
current_speech = {}
neg_threshold = threshold - 0.15
# to save potential segment end (and tolerate some silence)
temp_end = 0
# to save potential segment limits in case of maximum segment size reached
prev_end = next_start = 0
for i, speech_prob in enumerate(speech_probs):
if (speech_prob >= threshold) and temp_end:
temp_end = 0
if next_start < prev_end:
next_start = window_size_samples * i
if (speech_prob >= threshold) and not triggered:
triggered = True
current_speech["start"] = window_size_samples * i
continue
if (
triggered
and (window_size_samples * i) - current_speech["start"]
> max_speech_samples
):
if prev_end:
current_speech["end"] = prev_end
speeches.append(current_speech)
current_speech = {}
# previously reached silence (< neg_thres) and is still not speech (< thres)
if next_start < prev_end:
triggered = False
else:
current_speech["start"] = next_start
prev_end = next_start = temp_end = 0
else:
current_speech["end"] = window_size_samples * i
speeches.append(current_speech)
current_speech = {}
prev_end = next_start = temp_end = 0
triggered = False
continue
if (speech_prob < neg_threshold) and triggered:
if not temp_end:
temp_end = window_size_samples * i
# condition to avoid cutting in very short silence
if (
window_size_samples * i
) - temp_end > min_silence_samples_at_max_speech:
prev_end = temp_end
if (window_size_samples * i) - temp_end < min_silence_samples:
continue
else:
current_speech["end"] = temp_end
if (
current_speech["end"] - current_speech["start"]
) > min_speech_samples:
speeches.append(current_speech)
current_speech = {}
prev_end = next_start = temp_end = 0
triggered = False
continue
if (
current_speech
and (audio_length_samples - current_speech["start"]) > min_speech_samples
):
current_speech["end"] = audio_length_samples
speeches.append(current_speech)
for i, speech in enumerate(speeches):
if i == 0:
speech["start"] = int(max(0, speech["start"] - speech_pad_samples))
if i != len(speeches) - 1:
silence_duration = speeches[i + 1]["start"] - speech["end"]
if silence_duration < 2 * speech_pad_samples:
speech["end"] += int(silence_duration // 2)
speeches[i + 1]["start"] = int(
max(0, speeches[i + 1]["start"] - silence_duration // 2)
)
else:
speech["end"] = int(
min(audio_length_samples, speech["end"] + speech_pad_samples)
)
speeches[i + 1]["start"] = int(
max(0, speeches[i + 1]["start"] - speech_pad_samples)
)
else:
speech["end"] = int(
min(audio_length_samples, speech["end"] + speech_pad_samples)
)
return speeches
def warmup(self):
for _ in range(10):
dummy_audio = np.zeros(102400, dtype=np.float32)
self.vad((24000, dummy_audio), None)
def vad(
self,
audio: tuple[int, NDArray[np.float32] | NDArray[np.int16]],
options: None | SileroVadOptions,
) -> tuple[float, list[AudioChunk]]:
sampling_rate, audio_ = audio
logger.debug("VAD audio shape input: %s", audio_.shape)
try:
audio_ = audio_to_float32(audio_)
sr = 16000
if sr != sampling_rate:
try:
import librosa # type: ignore
except ImportError as e:
raise RuntimeError(
"Applying the VAD filter requires the librosa if the input sampling rate is not 16000hz"
) from e
audio_ = librosa.resample(audio_, orig_sr=sampling_rate, target_sr=sr)
if not options:
options = SileroVadOptions()
speech_chunks = self.get_speech_timestamps(audio_, options)
logger.debug("VAD speech chunks: %s", speech_chunks)
audio_ = self.collect_chunks(audio_, speech_chunks)
logger.debug("VAD audio shape: %s", audio_.shape)
duration_after_vad = audio_.shape[0] / sr
return duration_after_vad, speech_chunks
except Exception as e:
import math
import traceback
logger.debug("VAD Exception: %s", str(e))
exec = traceback.format_exc()
logger.debug("traceback %s", exec)
return math.inf, []
def __call__(self, x, state, sr: int):
if len(x.shape) == 1:
x = np.expand_dims(x, 0)
if len(x.shape) > 2:
raise ValueError(
f"Too many dimensions for input audio chunk {len(x.shape)}"
)
if sr / x.shape[1] > 31.25: # type: ignore
raise ValueError("Input audio chunk is too short")
h, c = state
ort_inputs = {
"input": x,
"h": h,
"c": c,
"sr": np.array(sr, dtype="int64"),
}
out, h, c = self.session.run(None, ort_inputs)
state = (h, c)
return out, state
```
## /backend/fastrtc/py.typed
```typed path="/backend/fastrtc/py.typed"
```
## /backend/fastrtc/reply_on_pause.py
```py path="/backend/fastrtc/reply_on_pause.py"
import asyncio
import inspect
from collections.abc import AsyncGenerator, Callable, Generator
from dataclasses import dataclass, field
from logging import getLogger
from threading import Event
from typing import Any, Literal, cast
import numpy as np
from numpy.typing import NDArray
from .pause_detection import ModelOptions, PauseDetectionModel, get_silero_model
from .tracks import EmitType, StreamHandler
from .utils import AdditionalOutputs, create_message, split_output
logger = getLogger(__name__)
@dataclass
class AlgoOptions:
"""Algorithm options."""
audio_chunk_duration: float = 0.6
started_talking_threshold: float = 0.2
speech_threshold: float = 0.1
@dataclass
class AppState:
stream: np.ndarray | None = None
sampling_rate: int = 0
pause_detected: bool = False
started_talking: bool = False
responding: bool = False
stopped: bool = False
buffer: np.ndarray | None = None
responded_audio: bool = False
interrupted: asyncio.Event = field(default_factory=asyncio.Event)
def new(self):
return AppState()
ReplyFnGenerator = (
Callable[
[tuple[int, NDArray[np.int16]], Any],
Generator[EmitType, None, None],
]
| Callable[
[tuple[int, NDArray[np.int16]]],
Generator[EmitType, None, None],
]
| Callable[
[tuple[int, NDArray[np.int16]]],
AsyncGenerator[EmitType, None],
]
| Callable[
[tuple[int, NDArray[np.int16]], Any],
AsyncGenerator[EmitType, None],
]
)
async def iterate(generator: Generator) -> Any:
return next(generator)
class ReplyOnPause(StreamHandler):
"""
A stream handler that processes incoming audio, detects pauses,
and triggers a reply function (`fn`) when a pause is detected.
This handler accumulates audio chunks, uses a Voice Activity Detection (VAD)
model to determine speech segments, and identifies pauses based on configurable
thresholds. Once a pause is detected after speech has started, it calls the
provided generator function `fn` with the accumulated audio.
It can optionally run a `startup_fn` at the beginning and supports interruption
of the reply function if new audio arrives.
Attributes:
fn (ReplyFnGenerator): The generator function to call when a pause is detected.
startup_fn (Callable | None): An optional function to run at startup.
algo_options (AlgoOptions): Configuration for the pause detection algorithm.
model_options (ModelOptions | None): Configuration for the VAD model.
can_interrupt (bool): Whether incoming audio can interrupt the `fn` execution.
expected_layout (Literal["mono", "stereo"]): Expected audio channel layout.
output_sample_rate (int): Sample rate for the output audio from `fn`.
input_sample_rate (int): Expected sample rate of the input audio.
model (PauseDetectionModel): The VAD model instance.
state (AppState): The current state of the pause detection logic.
generator (Generator | AsyncGenerator | None): The active generator instance from `fn`.
event (Event): Threading event used to signal pause detection.
loop (asyncio.AbstractEventLoop): The asyncio event loop.
"""
def __init__(
self,
fn: ReplyFnGenerator,
startup_fn: Callable | None = None,
algo_options: AlgoOptions | None = None,
model_options: ModelOptions | None = None,
can_interrupt: bool = True,
expected_layout: Literal["mono", "stereo"] = "mono",
output_sample_rate: int = 24000,
output_frame_size: int | None = None, # Deprecated
input_sample_rate: int = 48000,
model: PauseDetectionModel | None = None,
):
"""
Initializes the ReplyOnPause handler.
Args:
fn: The generator function to execute upon pause detection.
It receives `(sample_rate, audio_array)` and optionally `*args`.
startup_fn: An optional function to run once at the beginning.
algo_options: Options for the pause detection algorithm.
model_options: Options for the VAD model.
can_interrupt: If True, incoming audio during `fn` execution
will stop the generator and process the new audio.
expected_layout: Expected input audio layout ('mono' or 'stereo').
output_sample_rate: The sample rate expected for audio yielded by `fn`.
output_frame_size: Deprecated.
input_sample_rate: The expected sample rate of incoming audio.
model: An optional pre-initialized VAD model instance.
"""
super().__init__(
expected_layout,
output_sample_rate,
output_frame_size,
input_sample_rate=input_sample_rate,
)
self.can_interrupt = can_interrupt
self.expected_layout: Literal["mono", "stereo"] = expected_layout
self.model = model or get_silero_model()
self.fn = fn
self.is_async = inspect.isasyncgenfunction(fn)
self.event = Event()
self.state = AppState()
self.generator: (
Generator[EmitType, None, None] | AsyncGenerator[EmitType, None] | None
) = None
self.model_options = model_options
self.algo_options = algo_options or AlgoOptions()
self.startup_fn = startup_fn
@property
def _needs_additional_inputs(self) -> bool:
"""Checks if the reply function `fn` expects additional arguments."""
return len(inspect.signature(self.fn).parameters) > 1
def start_up(self):
"""
Executes the startup function `startup_fn` if provided.
Waits for additional arguments if `_needs_additional_inputs` is True
before calling `startup_fn`. Sets the `event` after completion.
"""
if self.startup_fn:
if self._needs_additional_inputs:
self.wait_for_args_sync()
args = self.latest_args[1:]
else:
args = ()
self.generator = self.startup_fn(*args)
self.event.set()
def copy(self):
"""Creates a new instance of ReplyOnPause with the same configuration."""
return ReplyOnPause(
self.fn,
self.startup_fn,
self.algo_options,
self.model_options,
self.can_interrupt,
self.expected_layout,
self.output_sample_rate,
self.output_frame_size,
self.input_sample_rate,
self.model,
)
def determine_pause(
self, audio: np.ndarray, sampling_rate: int, state: AppState
) -> bool:
"""
Analyzes an audio chunk to detect if a significant pause occurred after speech.
Uses the VAD model to measure speech duration within the chunk. Updates the
application state (`state`) regarding whether talking has started and
accumulates speech segments.
Args:
audio: The numpy array containing the audio chunk.
sampling_rate: The sample rate of the audio chunk.
state: The current application state.
Returns:
True if a pause satisfying the configured thresholds is detected
after speech has started, False otherwise.
"""
duration = len(audio) / sampling_rate
if duration >= self.algo_options.audio_chunk_duration:
dur_vad, _ = self.model.vad((sampling_rate, audio), self.model_options)
logger.debug("VAD duration: %s", dur_vad)
if (
dur_vad > self.algo_options.started_talking_threshold
and not state.started_talking
):
state.started_talking = True
logger.debug("Started talking")
self.send_message_sync(create_message("log", "started_talking"))
if state.started_talking:
if state.stream is None:
state.stream = audio
else:
state.stream = np.concatenate((state.stream, audio))
state.buffer = None
if dur_vad < self.algo_options.speech_threshold and state.started_talking:
return True
return False
def process_audio(self, audio: tuple[int, np.ndarray], state: AppState) -> None:
"""
Processes an incoming audio frame.
Appends the frame to the buffer, runs pause detection on the buffer,
and updates the application state.
Args:
audio: A tuple containing the sample rate and the audio frame data.
state: The current application state to update.
"""
frame_rate, array = audio
array = np.squeeze(array)
if not state.sampling_rate:
state.sampling_rate = frame_rate
if state.buffer is None:
state.buffer = array
else:
state.buffer = np.concatenate((state.buffer, array))
pause_detected = self.determine_pause(
state.buffer, state.sampling_rate, self.state
)
state.pause_detected = pause_detected
def receive(self, frame: tuple[int, np.ndarray]) -> None:
"""
Receives an audio frame from the stream.
Processes the audio frame using `process_audio`. If a pause is detected,
it sets the `event`. If interruption is enabled and a reply is ongoing,
it closes the current generator and clears the processing queue.
Args:
frame: A tuple containing the sample rate and the audio frame data.
"""
if self.state.responding and not self.can_interrupt:
return
self.process_audio(frame, self.state)
if self.state.pause_detected:
self.event.set()
if self.can_interrupt and self.state.responding:
self._close_generator()
self.generator = None
if self.can_interrupt:
self.clear_queue()
def _close_generator(self):
"""
Safely closes the active reply generator (`self.generator`).
Handles both synchronous and asynchronous generators, ensuring proper
resource cleanup (e.g., calling `aclose()` or `close()`).
Logs any errors during closure.
"""
if self.generator is None:
return
try:
if self.is_async:
# For async generators, we need to call aclose()
if hasattr(self.generator, "aclose"):
asyncio.run_coroutine_threadsafe(
cast(AsyncGenerator[EmitType, None], self.generator).aclose(),
self.loop,
).result(timeout=1.0) # Add timeout to prevent blocking
else:
# For sync generators, we can just exhaust it or close it
if hasattr(self.generator, "close"):
cast(Generator[EmitType, None, None], self.generator).close()
except Exception as e:
logger.debug(f"Error closing generator: {e}")
def reset(self):
"""
Resets the handler state to its initial condition.
Clears accumulated audio, resets state flags, closes any active generator,
and clears the event flag. Also handles resetting argument state for phone mode.
"""
super().reset()
if self.phone_mode:
self.args_set.set()
self.generator = None
self.event.clear()
self.state = AppState()
def trigger_response(self):
"""
Manually triggers the response generation process.
Sets the event flag, effectively simulating a pause detection.
Initializes the stream buffer if it's empty.
"""
self.event.set()
if self.state.stream is None:
self.state.stream = np.array([], dtype=np.int16)
async def async_iterate(self, generator) -> EmitType:
"""Helper function to get the next item from an async generator."""
return await anext(generator)
def emit(self):
"""
Produces the next output chunk from the reply generator (`fn`).
This method is called repeatedly after a pause is detected (event is set).
If the generator is not already running, it initializes it by calling `fn`
with the accumulated audio and any required additional arguments.
It then yields the next item from the generator. Handles both sync and
async generators. Resets the state upon generator completion or error.
Returns:
The next output item from the generator, or None if no pause event
has occurred or the generator is exhausted.
Raises:
Exception: Re-raises exceptions occurring within the `fn` generator.
"""
if not self.event.is_set():
return None
else:
if not self.generator:
self.send_message_sync(create_message("log", "pause_detected"))
if self._needs_additional_inputs and not self.args_set.is_set():
if not self.phone_mode:
self.wait_for_args_sync()
else:
self.latest_args = [None]
self.args_set.set()
logger.debug("Creating generator")
audio = cast(np.ndarray, self.state.stream).reshape(1, -1)
if self._needs_additional_inputs:
self.latest_args[0] = (self.state.sampling_rate, audio)
self.generator = self.fn(*self.latest_args) # type: ignore
else:
self.generator = self.fn((self.state.sampling_rate, audio)) # type: ignore
logger.debug("Latest args: %s", self.latest_args)
self.state = self.state.new()
self.state.responding = True
try:
if self.is_async:
output = asyncio.run_coroutine_threadsafe(
self.async_iterate(self.generator), self.loop
).result()
else:
output = next(self.generator) # type: ignore
audio, additional_outputs = split_output(output)
if audio is not None:
self.send_message_sync(create_message("log", "response_starting"))
self.state.responded_audio = True
if self.phone_mode:
if isinstance(additional_outputs, AdditionalOutputs):
self.latest_args = [None] + list(additional_outputs.args)
return output
except (StopIteration, StopAsyncIteration):
if not self.state.responded_audio:
self.send_message_sync(create_message("log", "response_starting"))
self.reset()
except Exception as e:
import traceback
traceback.print_exc()
logger.debug("Error in ReplyOnPause: %s", e)
self.reset()
raise e
```
## /backend/fastrtc/reply_on_stopwords.py
```py path="/backend/fastrtc/reply_on_stopwords.py"
import asyncio
import logging
import re
from collections.abc import Callable
from typing import Literal
import numpy as np
from .reply_on_pause import (
AlgoOptions,
AppState,
ModelOptions,
PauseDetectionModel,
ReplyFnGenerator,
ReplyOnPause,
)
from .speech_to_text import get_stt_model, stt_for_chunks
from .utils import audio_to_float32, create_message
logger = logging.getLogger(__name__)
class ReplyOnStopWordsState(AppState):
"""Extends AppState to include state specific to stop word detection."""
stop_word_detected: bool = False
post_stop_word_buffer: np.ndarray | None = None
started_talking_pre_stop_word: bool = False
def new(self):
"""Creates a new instance of ReplyOnStopWordsState."""
return ReplyOnStopWordsState()
class ReplyOnStopWords(ReplyOnPause):
"""
A stream handler that extends ReplyOnPause to trigger based on stop words
followed by a pause.
This handler listens to the incoming audio stream, performs Speech-to-Text (STT)
to detect predefined stop words. Once a stop word is detected, it waits for a
subsequent pause in speech (using the VAD model) before triggering the reply
function (`fn`) with the audio recorded *after* the stop word.
Attributes:
stop_words (list[str]): A list of words or phrases that trigger the pause detection.
state (ReplyOnStopWordsState): The current state of the stop word and pause detection logic.
stt_model: The Speech-to-Text model instance used for detecting stop words.
"""
def __init__(
self,
fn: ReplyFnGenerator,
stop_words: list[str],
startup_fn: Callable | None = None,
algo_options: AlgoOptions | None = None,
model_options: ModelOptions | None = None,
can_interrupt: bool = True,
expected_layout: Literal["mono", "stereo"] = "mono",
output_sample_rate: int = 24000,
output_frame_size: int | None = None, # Deprecated
input_sample_rate: int = 48000,
model: PauseDetectionModel | None = None,
):
"""
Initializes the ReplyOnStopWords handler.
Args:
fn: The generator function to execute upon stop word and pause detection.
It receives `(sample_rate, audio_array)` and optionally `*args`.
stop_words: A list of strings (words or phrases) to listen for.
Detection is case-insensitive and ignores punctuation.
startup_fn: An optional function to run once at the beginning.
algo_options: Options for the pause detection algorithm (used after stop word).
model_options: Options for the VAD model.
can_interrupt: If True, incoming audio during `fn` execution
will stop the generator and process the new audio.
expected_layout: Expected input audio layout ('mono' or 'stereo').
output_sample_rate: The sample rate expected for audio yielded by `fn`.
output_frame_size: Deprecated.
input_sample_rate: The expected sample rate of incoming audio.
model: An optional pre-initialized VAD model instance.
"""
super().__init__(
fn,
algo_options=algo_options,
startup_fn=startup_fn,
model_options=model_options,
can_interrupt=can_interrupt,
expected_layout=expected_layout,
output_sample_rate=output_sample_rate,
output_frame_size=output_frame_size,
input_sample_rate=input_sample_rate,
model=model,
)
self.stop_words = stop_words
self.state = ReplyOnStopWordsState()
self.stt_model = get_stt_model("moonshine/base")
def stop_word_detected(self, text: str) -> bool:
"""
Checks if any of the configured stop words are present in the text.
Performs a case-insensitive search, treating multi-word stop phrases
correctly and ignoring basic punctuation.
Args:
text: The text transcribed from the audio.
Returns:
True if a stop word is found, False otherwise.
"""
for stop_word in self.stop_words:
stop_word = stop_word.lower().strip().split(" ")
if bool(
re.search(
r"\b" + r"\s+".join(map(re.escape, stop_word)) + r"[.,!?]*\b",
text.lower(),
)
):
logger.debug("Stop word detected: %s", stop_word)
return True
return False
async def _send_stopword(
self,
):
"""Internal async method to send a 'stopword' message via the channel."""
if self.channel:
self.channel.send(create_message("stopword", ""))
logger.debug("Sent stopword")
def send_stopword(self):
"""Sends a 'stopword' message asynchronously via the communication channel."""
asyncio.run_coroutine_threadsafe(self._send_stopword(), self.loop)
def determine_pause( # type: ignore
self, audio: np.ndarray, sampling_rate: int, state: ReplyOnStopWordsState
) -> bool:
"""
Analyzes an audio chunk to detect stop words and subsequent pauses.
Overrides the `ReplyOnPause.determine_pause` method.
First, it performs STT on the audio buffer to detect stop words.
Once a stop word is detected (`state.stop_word_detected` is True), it then
uses the VAD model (similar to `ReplyOnPause`) to detect a pause in the
audio *following* the stop word.
Args:
audio: The numpy array containing the audio chunk.
sampling_rate: The sample rate of the audio chunk.
state: The current application state (ReplyOnStopWordsState).
Returns:
True if a stop word has been detected and a subsequent pause
satisfying the configured thresholds is detected, False otherwise.
"""
import librosa
duration = len(audio) / sampling_rate
if duration >= self.algo_options.audio_chunk_duration:
if not state.stop_word_detected:
audio_f32 = audio_to_float32(audio)
audio_rs = librosa.resample(
audio_f32, orig_sr=sampling_rate, target_sr=16000
)
if state.post_stop_word_buffer is None:
state.post_stop_word_buffer = audio_rs
else:
state.post_stop_word_buffer = np.concatenate(
(state.post_stop_word_buffer, audio_rs)
)
if len(state.post_stop_word_buffer) / 16000 > 2:
state.post_stop_word_buffer = state.post_stop_word_buffer[-32000:]
dur_vad, chunks = self.model.vad(
(16000, state.post_stop_word_buffer),
self.model_options,
)
text = stt_for_chunks(
self.stt_model, (16000, state.post_stop_word_buffer), chunks
)
logger.debug(f"STT: {text}")
state.stop_word_detected = self.stop_word_detected(text)
if state.stop_word_detected:
logger.debug("Stop word detected")
self.send_stopword()
state.buffer = None
else:
dur_vad, _ = self.model.vad((sampling_rate, audio), self.model_options)
logger.debug("VAD duration: %s", dur_vad)
if (
dur_vad > self.algo_options.started_talking_threshold
and not state.started_talking
and state.stop_word_detected
):
state.started_talking = True
logger.debug("Started talking")
if state.started_talking:
if state.stream is None:
state.stream = audio
else:
state.stream = np.concatenate((state.stream, audio))
state.buffer = None
if (
dur_vad < self.algo_options.speech_threshold
and state.started_talking
and state.stop_word_detected
):
return True
return False
def reset(self):
"""
Resets the handler state to its initial condition.
Clears accumulated audio, resets state flags (including stop word state),
closes any active generator, and clears the event flag.
"""
super().reset()
self.generator = None
self.event.clear()
self.state = ReplyOnStopWordsState()
def copy(self):
"""Creates a new instance of ReplyOnStopWords with the same configuration."""
return ReplyOnStopWords(
self.fn,
self.stop_words,
self.startup_fn,
self.algo_options,
self.model_options,
self.can_interrupt,
self.expected_layout,
self.output_sample_rate,
self.output_frame_size,
self.input_sample_rate,
self.model,
)
```
## /backend/fastrtc/speech_to_text/__init__.py
```py path="/backend/fastrtc/speech_to_text/__init__.py"
from .stt_ import MoonshineSTT, get_stt_model, stt_for_chunks
__all__ = ["get_stt_model", "MoonshineSTT", "get_stt_model", "stt_for_chunks"]
```
## /backend/fastrtc/speech_to_text/stt_.py
```py path="/backend/fastrtc/speech_to_text/stt_.py"
from functools import lru_cache
from pathlib import Path
from typing import Literal, Protocol
import click
import librosa
import numpy as np
from numpy.typing import NDArray
from ..utils import AudioChunk, audio_to_float32
curr_dir = Path(__file__).parent
class STTModel(Protocol):
def stt(self, audio: tuple[int, NDArray[np.int16 | np.float32]]) -> str: ...
class MoonshineSTT(STTModel):
def __init__(
self, model: Literal["moonshine/base", "moonshine/tiny"] = "moonshine/base"
):
try:
from moonshine_onnx import MoonshineOnnxModel, load_tokenizer
except (ImportError, ModuleNotFoundError):
raise ImportError(
"Install fastrtc[stt] for speech-to-text and stopword detection support."
)
self.model = MoonshineOnnxModel(model_name=model)
self.tokenizer = load_tokenizer()
def stt(self, audio: tuple[int, NDArray[np.int16 | np.float32]]) -> str:
sr, audio_np = audio # type: ignore
audio_np = audio_to_float32(audio_np)
if sr != 16000:
audio_np: NDArray[np.float32] = librosa.resample(
audio_np, orig_sr=sr, target_sr=16000
)
if audio_np.ndim == 1:
audio_np = audio_np.reshape(1, -1)
tokens = self.model.generate(audio_np)
return self.tokenizer.decode_batch(tokens)[0]
@lru_cache
def get_stt_model(
model: Literal["moonshine/base", "moonshine/tiny"] = "moonshine/base",
) -> STTModel:
import os
os.environ["TOKENIZERS_PARALLELISM"] = "false"
m = MoonshineSTT(model)
from moonshine_onnx import load_audio
audio = load_audio(str(curr_dir / "test_file.wav"))
print(click.style("INFO", fg="green") + ":\t Warming up STT model.")
m.stt((16000, audio))
print(click.style("INFO", fg="green") + ":\t STT model warmed up.")
return m
def stt_for_chunks(
stt_model: STTModel,
audio: tuple[int, NDArray[np.int16 | np.float32]],
chunks: list[AudioChunk],
) -> str:
sr, audio_np = audio
return " ".join(
[
stt_model.stt((sr, audio_np[chunk["start"] : chunk["end"]]))
for chunk in chunks
]
)
```
## /backend/fastrtc/speech_to_text/test_file.wav
Binary file available at https://raw.githubusercontent.com/gradio-app/fastrtc/refs/heads/main/backend/fastrtc/speech_to_text/test_file.wav
## /backend/fastrtc/stream.py
```py path="/backend/fastrtc/stream.py"
import inspect
import logging
from collections.abc import Callable
from contextlib import AbstractAsyncContextManager
from pathlib import Path
from typing import (
Any,
Literal,
TypedDict,
cast,
)
import anyio
import gradio as gr
from fastapi import FastAPI, Request, WebSocket
from fastapi.responses import HTMLResponse
from gradio import Blocks
from gradio.components.base import Component
from pydantic import BaseModel
from typing_extensions import NotRequired
from .tracks import HandlerType, StreamHandlerImpl
from .utils import RTCConfigurationCallable
from .webrtc import WebRTC
from .webrtc_connection_mixin import WebRTCConnectionMixin
from .websocket import WebSocketHandler
logger = logging.getLogger(__name__)
curr_dir = Path(__file__).parent
class Body(BaseModel):
sdp: str | None = None
candidate: dict[str, Any] | None = None
type: str
webrtc_id: str
class UIArgs(TypedDict):
"""
UI customization arguments for the Gradio Blocks UI of the Stream class
"""
title: NotRequired[str]
"""Title of the demo"""
subtitle: NotRequired[str]
"""Subtitle of the demo. Text will be centered and displayed below the title."""
icon: NotRequired[str]
"""Icon to display on the button instead of the wave animation. The icon should be a path/url to a .svg/.png/.jpeg file."""
icon_button_color: NotRequired[str]
"""Color of the icon button. Default is var(--color-accent) of the demo theme."""
pulse_color: NotRequired[str]
"""Color of the pulse animation. Default is var(--color-accent) of the demo theme."""
icon_radius: NotRequired[int]
"""Border radius of the icon button expressed as a percentage of the button size. Default is 50%."""
send_input_on: NotRequired[Literal["submit", "change"]]
"""When to send the input to the handler. Default is "change".
If "submit", the input will be sent when the submit event is triggered by the user.
If "change", the input will be sent whenever the user changes the input value.
"""
hide_title: NotRequired[bool]
"""If True, the title and subtitle will not be displayed."""
class Stream(WebRTCConnectionMixin):
"""
Define an audio or video stream with a built-in UI, mountable on a FastAPI app.
This class encapsulates the logic for handling real-time communication (WebRTC)
streams, including setting up peer connections, managing tracks, generating
a Gradio user interface, and integrating with FastAPI for API endpoints.
It supports different modes (send, receive, send-receive) and modalities
(audio, video, audio-video), and can optionally handle additional Gradio
input/output components alongside the stream. It also provides functionality
for telephone integration via the FastPhone method.
Attributes:
mode (Literal["send-receive", "receive", "send"]): The direction of the stream.
modality (Literal["video", "audio", "audio-video"]): The type of media stream.
rtp_params (dict[str, Any] | None): Parameters for RTP encoding.
event_handler (HandlerType): The main function to process stream data.
concurrency_limit (int): The maximum number of concurrent connections allowed.
time_limit (float | None): Time limit in seconds for the event handler execution.
allow_extra_tracks (bool): Whether to allow extra tracks beyond the specified modality.
additional_output_components (list[Component] | None): Extra Gradio output components.
additional_input_components (list[Component] | None): Extra Gradio input components.
additional_outputs_handler (Callable | None): Handler for additional outputs.
track_constraints (dict[str, Any] | None): Constraints for media tracks (e.g., resolution).
webrtc_component (WebRTC): The underlying Gradio WebRTC component instance.
rtc_configuration (dict[str, Any] | None): Configuration for the RTCPeerConnection (e.g., ICE servers).
_ui (Blocks): The Gradio Blocks UI instance.
"""
def __init__(
self,
handler: HandlerType,
*,
additional_outputs_handler: Callable | None = None,
mode: Literal["send-receive", "receive", "send"] = "send-receive",
modality: Literal["video", "audio", "audio-video"] = "video",
concurrency_limit: int | None | Literal["default"] = "default",
time_limit: float | None = None,
allow_extra_tracks: bool = False,
rtp_params: dict[str, Any] | None = None,
rtc_configuration: RTCConfigurationCallable | None = None,
server_rtc_configuration: dict[str, Any] | None = None,
track_constraints: dict[str, Any] | None = None,
additional_inputs: list[Component] | None = None,
additional_outputs: list[Component] | None = None,
ui_args: UIArgs | None = None,
):
"""
Initialize the Stream instance.
Args:
handler: The function to handle incoming stream data and return output data.
additional_outputs_handler: An optional function to handle updates to additional output components.
mode: The direction of the stream ('send', 'receive', or 'send-receive').
modality: The type of media ('video', 'audio', or 'audio-video').
concurrency_limit: Maximum number of concurrent connections. 'default' maps to 1.
time_limit: Maximum execution time for the handler function in seconds.
allow_extra_tracks: If True, allows connections with tracks not matching the modality.
rtp_params: Optional dictionary of RTP encoding parameters.
rtc_configuration: Optional Callable or dictionary for RTCPeerConnection configuration (e.g., ICE servers).
Required when deploying on Colab or Spaces.
server_rtc_configuration: Optional dictionary for RTCPeerConnection configuration on the server side. Note
that setting iceServers to be an empty list will mean no ICE servers will be used in the server.
track_constraints: Optional dictionary of constraints for media tracks (e.g., resolution, frame rate).
additional_inputs: Optional list of extra Gradio input components.
additional_outputs: Optional list of extra Gradio output components. Requires `additional_outputs_handler`.
ui_args: Optional dictionary to customize the default UI appearance (title, subtitle, icon, etc.).
Raises:
ValueError: If `additional_outputs` are provided without `additional_outputs_handler`.
"""
WebRTCConnectionMixin.__init__(self)
self.mode = mode
self.modality = modality
self.rtp_params = rtp_params
self.event_handler = handler
self.concurrency_limit = cast(
(int),
1 if concurrency_limit in ["default", None] else concurrency_limit,
)
self.concurrency_limit_gradio = cast(
int | Literal["default"] | None, concurrency_limit
)
self.time_limit = time_limit
self.allow_extra_tracks = allow_extra_tracks
self.additional_output_components = additional_outputs
self.additional_input_components = additional_inputs
self.additional_outputs_handler = additional_outputs_handler
self.track_constraints = track_constraints
self.webrtc_component: WebRTC
self.rtc_configuration = rtc_configuration
self.server_rtc_configuration = self.convert_to_aiortc_format(
server_rtc_configuration
)
self._ui = self._generate_default_ui(ui_args)
self._ui.launch = self._wrap_gradio_launch(self._ui.launch)
def mount(self, app: FastAPI, path: str = ""):
"""
Mount the stream's API endpoints onto a FastAPI application.
This method adds the necessary routes (`/webrtc/offer`, `/telephone/handler`,
`/telephone/incoming`, `/websocket/offer`) to the provided FastAPI app,
prefixed with the optional `path`. It also injects a startup message
into the app's lifespan.
Args:
app: The FastAPI application instance.
path: An optional URL prefix for the mounted routes.
"""
from fastapi import APIRouter
router = APIRouter(prefix=path)
router.post("/webrtc/offer")(self.offer)
router.websocket("/telephone/handler")(self.telephone_handler)
router.post("/telephone/incoming")(self.handle_incoming_call)
router.websocket("/websocket/offer")(self.websocket_offer)
lifespan = self._inject_startup_message(app.router.lifespan_context)
app.router.lifespan_context = lifespan
app.include_router(router)
@staticmethod
def _print_error(env: Literal["colab", "spaces"]):
"""
Print an error message and raise RuntimeError for missing rtc_configuration.
Used internally when running in Colab or Spaces without necessary WebRTC setup.
Args:
env: The environment ('colab' or 'spaces') where the error occurred.
Raises:
RuntimeError: Always raised after printing the error message.
"""
import click
print(
click.style("ERROR", fg="red")
+ f":\t Running in {env} is not possible without providing a valid rtc_configuration. "
+ "See "
+ click.style("https://fastrtc.org/deployment/", fg="cyan")
+ " for more information."
)
raise RuntimeError(
f"Running in {env} is not possible without providing a valid rtc_configuration. "
+ "See https://fastrtc.org/deployment/ for more information."
)
def _check_colab_or_spaces(self):
"""
Check if running in Colab or Spaces and if rtc_configuration is missing.
Calls `_print_error` if the conditions are met.
Raises:
RuntimeError: If running in Colab/Spaces without `rtc_configuration`.
"""
from gradio.utils import colab_check, get_space
if colab_check() and not self.rtc_configuration:
self._print_error("colab")
if get_space() and not self.rtc_configuration:
self._print_error("spaces")
def _wrap_gradio_launch(self, callable):
"""
Wrap the Gradio launch method to inject environment checks.
Ensures that `_check_colab_or_spaces` is called during the application
lifespan when `Blocks.launch()` is invoked.
Args:
callable: The original `gradio.Blocks.launch` method.
Returns:
A wrapped version of the launch method.
"""
import contextlib
def wrapper(*args, **kwargs):
lifespan = kwargs.get("app_kwargs", {}).get("lifespan", None)
@contextlib.asynccontextmanager
async def new_lifespan(app: FastAPI):
if lifespan is None:
self._check_colab_or_spaces()
yield
else:
async with lifespan(app):
self._check_colab_or_spaces()
yield
if "app_kwargs" not in kwargs:
kwargs["app_kwargs"] = {}
kwargs["app_kwargs"]["lifespan"] = new_lifespan
return callable(*args, **kwargs)
return wrapper
def _inject_startup_message(
self, lifespan: Callable[[FastAPI], AbstractAsyncContextManager] | None = None
):
"""
Create a FastAPI lifespan context manager to print startup messages and check environment.
Args:
lifespan: An optional existing lifespan context manager to wrap.
Returns:
An async context manager function suitable for `FastAPI(lifespan=...)`.
"""
import contextlib
import click
def print_startup_message():
self._check_colab_or_spaces()
print(
click.style("INFO", fg="green")
+ ":\t Visit "
+ click.style("https://fastrtc.org/userguide/api/", fg="cyan")
+ " for WebRTC or Websocket API docs."
)
@contextlib.asynccontextmanager
async def new_lifespan(app: FastAPI):
if lifespan is None:
print_startup_message()
yield
else:
async with lifespan(app):
print_startup_message()
yield
return new_lifespan
def _generate_default_ui(
self,
ui_args: UIArgs | None = None,
) -> Blocks:
"""
Generate the default Gradio UI based on mode, modality, and arguments.
Constructs a `gradio.Blocks` interface with the appropriate WebRTC component
and any specified additional input/output components.
Args:
ui_args: Optional dictionary containing UI customization arguments
(title, subtitle, icon, etc.).
Returns:
A `gradio.Blocks` instance representing the generated UI.
Raises:
ValueError: If `additional_outputs` are provided without
`additional_outputs_handler`.
ValueError: If the combination of `mode` and `modality` is invalid
or not supported for UI generation.
"""
ui_args = ui_args or {}
same_components = []
additional_input_components = self.additional_input_components or []
additional_output_components = self.additional_output_components or []
if additional_output_components and not self.additional_outputs_handler:
raise ValueError(
"additional_outputs_handler must be provided if there are additional output components."
)
if additional_input_components and additional_output_components:
same_components = [
component
for component in additional_input_components
if component in additional_output_components
]
for component in additional_output_components:
if component in same_components:
same_components.append(component)
if self.modality == "video" and self.mode == "receive":
with gr.Blocks() as demo:
if not ui_args.get("hide_title"):
gr.HTML(
f"""
{ui_args.get("title", "Video Streaming (Powered by FastRTC ⚡️)")}
"""
)
if ui_args.get("subtitle"):
gr.Markdown(
f"""
{ui_args.get("subtitle")}
"""
)
with gr.Row():
with gr.Column():
if additional_input_components:
for component in additional_input_components:
component.render()
button = gr.Button("Start Stream", variant="primary")
with gr.Column():
output_video = WebRTC(
label="Video Stream",
rtc_configuration=self.rtc_configuration,
track_constraints=self.track_constraints,
mode="receive",
modality="video",
)
self.webrtc_component = output_video
for component in additional_output_components:
if component not in same_components:
component.render()
output_video.stream(
fn=self.event_handler,
inputs=self.additional_input_components,
outputs=[output_video],
trigger=button.click,
time_limit=self.time_limit,
concurrency_limit=self.concurrency_limit, # type: ignore
send_input_on=ui_args.get("send_input_on", "change"),
)
if additional_output_components:
assert self.additional_outputs_handler
output_video.on_additional_outputs(
self.additional_outputs_handler,
concurrency_limit=self.concurrency_limit_gradio, # type: ignore
inputs=additional_output_components,
outputs=additional_output_components,
)
elif self.modality == "video" and self.mode == "send":
with gr.Blocks() as demo:
if not ui_args.get("hide_title"):
gr.HTML(
f"""
{ui_args.get("title", "Video Streaming (Powered by FastRTC ⚡️)")}
"""
)
if ui_args.get("subtitle"):
gr.Markdown(
f"""
{ui_args.get("subtitle")}
"""
)
with gr.Row():
if additional_input_components:
with gr.Column():
for component in additional_input_components:
component.render()
with gr.Column():
output_video = WebRTC(
label="Video Stream",
rtc_configuration=self.rtc_configuration,
track_constraints=self.track_constraints,
mode="send",
modality="video",
)
self.webrtc_component = output_video
for component in additional_output_components:
if component not in same_components:
component.render()
output_video.stream(
fn=self.event_handler,
inputs=[output_video] + additional_input_components,
outputs=[output_video],
time_limit=self.time_limit,
concurrency_limit=self.concurrency_limit, # type: ignore
send_input_on=ui_args.get("send_input_on", "change"),
)
if additional_output_components:
assert self.additional_outputs_handler
output_video.on_additional_outputs(
self.additional_outputs_handler,
concurrency_limit=self.concurrency_limit_gradio, # type: ignore
inputs=additional_output_components,
outputs=additional_output_components,
)
elif self.modality == "video" and self.mode == "send-receive":
css = """.my-group {max-width: 600px !important; max-height: 600 !important;}
.my-column {display: flex !important; justify-content: center !important; align-items: center !important};"""
with gr.Blocks(css=css) as demo:
gr.HTML(
f"""
{ui_args.get("title", "Video Streaming (Powered by FastRTC ⚡️)")}
"""
)
if ui_args.get("subtitle"):
gr.Markdown(
f"""
{ui_args.get("subtitle")}
"""
)
with gr.Column(elem_classes=["my-column"]):
with gr.Group(elem_classes=["my-group"]):
image = WebRTC(
label="Stream",
rtc_configuration=self.rtc_configuration,
track_constraints=self.track_constraints,
mode="send-receive",
modality="video",
)
for component in additional_input_components:
component.render()
if additional_output_components:
with gr.Column():
for component in additional_output_components:
if component not in same_components:
component.render()
self.webrtc_component = image
image.stream(
fn=self.event_handler,
inputs=[image] + additional_input_components,
outputs=[image],
time_limit=self.time_limit,
concurrency_limit=self.concurrency_limit, # type: ignore
send_input_on=ui_args.get("send_input_on", "change"),
)
if additional_output_components:
assert self.additional_outputs_handler
image.on_additional_outputs(
self.additional_outputs_handler,
inputs=additional_output_components,
outputs=additional_output_components,
concurrency_limit=self.concurrency_limit_gradio, # type: ignore
)
elif self.modality == "audio" and self.mode == "receive":
with gr.Blocks() as demo:
if not ui_args.get("hide_title"):
gr.HTML(
f"""
{ui_args.get("title", "Audio Streaming (Powered by FastRTC ⚡️)")}
"""
)
if ui_args.get("subtitle"):
gr.Markdown(
f"""
{ui_args.get("subtitle")}
"""
)
with gr.Row():
with gr.Column():
for component in additional_input_components:
component.render()
button = gr.Button("Start Stream", variant="primary")
with gr.Column():
output_video = WebRTC(
label="Audio Stream",
rtc_configuration=self.rtc_configuration,
track_constraints=self.track_constraints,
mode="receive",
modality="audio",
icon=ui_args.get("icon"),
icon_button_color=ui_args.get("icon_button_color"),
pulse_color=ui_args.get("pulse_color"),
icon_radius=ui_args.get("icon_radius"),
)
self.webrtc_component = output_video
for component in additional_output_components:
if component not in same_components:
component.render()
output_video.stream(
fn=self.event_handler,
inputs=self.additional_input_components,
outputs=[output_video],
trigger=button.click,
time_limit=self.time_limit,
concurrency_limit=self.concurrency_limit, # type: ignore
send_input_on=ui_args.get("send_input_on", "change"),
)
if additional_output_components:
assert self.additional_outputs_handler
output_video.on_additional_outputs(
self.additional_outputs_handler,
inputs=additional_output_components,
outputs=additional_output_components,
concurrency_limit=self.concurrency_limit_gradio, # type: ignore
)
elif self.modality == "audio" and self.mode == "send":
with gr.Blocks() as demo:
if not ui_args.get("hide_title"):
gr.HTML(
f"""
{ui_args.get("title", "Audio Streaming (Powered by FastRTC ⚡️)")}
"""
)
if ui_args.get("subtitle"):
gr.Markdown(
f"""
{ui_args.get("subtitle")}
"""
)
with gr.Row():
with gr.Column():
with gr.Group():
image = WebRTC(
label="Stream",
rtc_configuration=self.rtc_configuration,
track_constraints=self.track_constraints,
mode="send",
modality="audio",
icon=ui_args.get("icon"),
icon_button_color=ui_args.get("icon_button_color"),
pulse_color=ui_args.get("pulse_color"),
icon_radius=ui_args.get("icon_radius"),
)
self.webrtc_component = image
for component in additional_input_components:
if component not in same_components:
component.render()
if additional_output_components:
with gr.Column():
for component in additional_output_components:
component.render()
image.stream(
fn=self.event_handler,
inputs=[image] + additional_input_components,
outputs=[image],
time_limit=self.time_limit,
concurrency_limit=self.concurrency_limit, # type: ignore
send_input_on=ui_args.get("send_input_on", "change"),
)
if additional_output_components:
assert self.additional_outputs_handler
image.on_additional_outputs(
self.additional_outputs_handler,
inputs=additional_output_components,
outputs=additional_output_components,
concurrency_limit=self.concurrency_limit_gradio, # type: ignore
)
elif self.modality == "audio" and self.mode == "send-receive":
with gr.Blocks() as demo:
if not ui_args.get("hide_title"):
gr.HTML(
f"""
{ui_args.get("title", "Audio Streaming (Powered by FastRTC ⚡️)")}
"""
)
if ui_args.get("subtitle"):
gr.Markdown(
f"""
{ui_args.get("subtitle")}
"""
)
with gr.Row():
with gr.Column():
with gr.Group():
image = WebRTC(
label="Stream",
rtc_configuration=self.rtc_configuration,
track_constraints=self.track_constraints,
mode="send-receive",
modality="audio",
icon=ui_args.get("icon"),
icon_button_color=ui_args.get("icon_button_color"),
pulse_color=ui_args.get("pulse_color"),
icon_radius=ui_args.get("icon_radius"),
)
self.webrtc_component = image
for component in additional_input_components:
if component not in same_components:
component.render()
if additional_output_components:
with gr.Column():
for component in additional_output_components:
component.render()
image.stream(
fn=self.event_handler,
inputs=[image] + additional_input_components,
outputs=[image],
time_limit=self.time_limit,
concurrency_limit=self.concurrency_limit, # type: ignore
send_input_on=ui_args.get("send_input_on", "change"),
)
if additional_output_components:
assert self.additional_outputs_handler
image.on_additional_outputs(
self.additional_outputs_handler,
inputs=additional_output_components,
outputs=additional_output_components,
concurrency_limit=self.concurrency_limit_gradio, # type: ignore
)
elif self.modality == "audio-video" and self.mode == "send-receive":
css = """.my-group {max-width: 600px !important; max-height: 600 !important;}
.my-column {display: flex !important; justify-content: center !important; align-items: center !important};"""
with gr.Blocks(css=css) as demo:
if not ui_args.get("hide_title"):
gr.HTML(
f"""
{ui_args.get("title", "Audio Video Streaming (Powered by FastRTC ⚡️)")}
"""
)
if ui_args.get("subtitle"):
gr.Markdown(
f"""
{ui_args.get("subtitle")}
"""
)
with gr.Row():
with gr.Column(elem_classes=["my-column"]):
with gr.Group(elem_classes=["my-group"]):
image = WebRTC(
label="Stream",
rtc_configuration=self.rtc_configuration,
track_constraints=self.track_constraints,
mode="send-receive",
modality="audio-video",
icon=ui_args.get("icon"),
icon_button_color=ui_args.get("icon_button_color"),
pulse_color=ui_args.get("pulse_color"),
icon_radius=ui_args.get("icon_radius"),
)
self.webrtc_component = image
for component in additional_input_components:
if component not in same_components:
component.render()
if additional_output_components:
with gr.Column():
for component in additional_output_components:
component.render()
image.stream(
fn=self.event_handler,
inputs=[image] + additional_input_components,
outputs=[image],
time_limit=self.time_limit,
concurrency_limit=self.concurrency_limit, # type: ignore
send_input_on=ui_args.get("send_input_on", "change"),
)
if additional_output_components:
assert self.additional_outputs_handler
image.on_additional_outputs(
self.additional_outputs_handler,
inputs=additional_output_components,
outputs=additional_output_components,
concurrency_limit=self.concurrency_limit_gradio, # type: ignore
)
else:
raise ValueError(f"Invalid modality: {self.modality} and mode: {self.mode}")
return demo
@property
def ui(self) -> Blocks:
"""
Get the Gradio Blocks UI instance associated with this stream.
Returns:
The `gradio.Blocks` UI instance.
"""
return self._ui
@ui.setter
def ui(self, blocks: Blocks):
"""
Set a custom Gradio Blocks UI for this stream.
Args:
blocks: The `gradio.Blocks` instance to use as the UI.
"""
self._ui = blocks
async def offer(self, body: Body):
"""
Handle an incoming WebRTC offer via HTTP POST.
Processes the SDP offer and ICE candidates from the client to establish
a WebRTC connection.
Args:
body: A Pydantic model containing the SDP offer, optional ICE candidate,
type ('offer'), and a unique WebRTC ID.
Returns:
A dictionary containing the SDP answer generated by the server.
"""
return await self.handle_offer(
body.model_dump(), set_outputs=self.set_additional_outputs(body.webrtc_id)
)
async def get_rtc_configuration(self):
if inspect.isfunction(self.rtc_configuration):
if inspect.iscoroutinefunction(self.rtc_configuration):
return await self.rtc_configuration()
else:
return anyio.to_thread.run_sync(self.rtc_configuration) # type: ignore
else:
return self.rtc_configuration
async def handle_incoming_call(self, request: Request):
"""
Handle incoming telephone calls (e.g., via Twilio).
Generates TwiML instructions to connect the incoming call to the
WebSocket handler (`/telephone/handler`) for audio streaming.
Args:
request: The FastAPI Request object for the incoming call webhook.
Returns:
An HTMLResponse containing the TwiML instructions as XML.
"""
from twilio.twiml.voice_response import Connect, VoiceResponse
response = VoiceResponse()
response.say("Connecting to the AI assistant.")
connect = Connect()
path = request.url.path.removesuffix("/telephone/incoming")
connect.stream(url=f"wss://{request.url.hostname}{path}/telephone/handler")
response.append(connect)
response.say("The call has been disconnected.")
return HTMLResponse(content=str(response), media_type="application/xml")
async def telephone_handler(self, websocket: WebSocket):
"""
The websocket endpoint for streaming audio over Twilio phone.
Args:
websocket: The incoming WebSocket connection object.
"""
handler = cast(StreamHandlerImpl, self.event_handler.copy()) # type: ignore
handler.phone_mode = True
async def set_handler(s: str, a: WebSocketHandler):
if len(self.connections) >= self.concurrency_limit: # type: ignore
await cast(WebSocket, a.websocket).send_json(
{
"status": "failed",
"meta": {
"error": "concurrency_limit_reached",
"limit": self.concurrency_limit,
},
}
)
await websocket.close()
return
ws = WebSocketHandler(
handler, set_handler, lambda s: None, lambda s: lambda a: None
)
await ws.handle_websocket(websocket)
async def websocket_offer(self, websocket: WebSocket):
"""
Handle WebRTC signaling over a WebSocket connection.
Provides an alternative to the HTTP POST `/webrtc/offer` endpoint for
exchanging SDP offers/answers and ICE candidates via WebSocket messages.
Args:
websocket: The incoming WebSocket connection object.
"""
handler = cast(StreamHandlerImpl, self.event_handler.copy()) # type: ignore
handler.phone_mode = False
async def set_handler(s: str, a: WebSocketHandler):
if len(self.connections) >= self.concurrency_limit: # type: ignore
await cast(WebSocket, a.websocket).send_json(
{
"status": "failed",
"meta": {
"error": "concurrency_limit_reached",
"limit": self.concurrency_limit,
},
}
)
await websocket.close()
return
self.connections[s] = [a] # type: ignore
def clean_up(s):
self.clean_up(s)
ws = WebSocketHandler(
handler, set_handler, clean_up, lambda s: self.set_additional_outputs(s)
)
await ws.handle_websocket(websocket)
def fastphone(
self,
token: str | None = None,
host: str = "127.0.0.1",
port: int = 8000,
**kwargs,
):
"""
Launch the FastPhone service for telephone integration.
Starts a local FastAPI server, mounts the stream, creates a public tunnel
(using Gradio's tunneling), registers the tunnel URL with the FastPhone
backend service, and prints the assigned phone number and access code.
This allows users to call the phone number and interact with the stream handler.
Args:
token: Optional Hugging Face Hub token for authentication with the
FastPhone service. If None, attempts to find one automatically.
host: The local host address to bind the server to.
port: The local port to bind the server to.
**kwargs: Additional keyword arguments passed to `uvicorn.run`.
Raises:
httpx.HTTPStatusError: If registration with the FastPhone service fails.
RuntimeError: If running in Colab/Spaces without `rtc_configuration`.
"""
import atexit
import inspect
import secrets
import threading
import time
import urllib.parse
import click
import httpx
import uvicorn
from gradio.networking import setup_tunnel
from gradio.tunneling import CURRENT_TUNNELS
from huggingface_hub import get_token
app = FastAPI()
self.mount(app)
t = threading.Thread(
target=uvicorn.run,
args=(app,),
kwargs={"host": host, "port": port, **kwargs},
)
t.start()
# Check if setup_tunnel accepts share_server_tls_certificate parameter
setup_tunnel_params = inspect.signature(setup_tunnel).parameters
tunnel_kwargs = {
"local_host": host,
"local_port": port,
"share_token": secrets.token_urlsafe(32),
"share_server_address": None,
}
if "share_server_tls_certificate" in setup_tunnel_params:
tunnel_kwargs["share_server_tls_certificate"] = None
url = setup_tunnel(**tunnel_kwargs)
host = urllib.parse.urlparse(url).netloc
URL = "https://api.fastrtc.org"
try:
r = httpx.post(
URL + "/register",
json={"url": host},
headers={"Authorization": token or get_token() or ""},
)
r.raise_for_status()
except Exception:
URL = "https://fastrtc-fastphone.hf.space"
r = httpx.post(
URL + "/register",
json={"url": host},
headers={"Authorization": token or get_token() or ""},
)
r.raise_for_status()
if r.status_code == 202:
print(
click.style("INFO", fg="orange")
+ ":\t You have "
+ "run out of your quota"
)
return
data = r.json()
code = f"{data['code']}"
phone_number = data["phone"]
reset_date = data["reset_date"]
print(
click.style("INFO", fg="green")
+ ":\t Your FastPhone is now live! Call "
+ click.style(phone_number, fg="cyan")
+ " and use code "
+ click.style(code, fg="cyan")
+ " to connect to your stream."
)
minutes = str(int(data["time_remaining"] // 60)).zfill(2)
seconds = str(int(data["time_remaining"] % 60)).zfill(2)
print(
click.style("INFO", fg="green")
+ ":\t You have "
+ click.style(f"{minutes}:{seconds}", fg="cyan")
+ " minutes remaining in your quota (Resetting on "
+ click.style(f"{reset_date}", fg="cyan")
+ ")"
)
print(
click.style("INFO", fg="green")
+ ":\t Visit "
+ click.style(
"https://fastrtc.org/userguide/audio/#telephone-integration",
fg="cyan",
)
+ " for information on making your handler compatible with phone usage."
)
def unregister():
httpx.post(
URL + "/unregister",
json={"url": host, "code": code},
headers={"Authorization": token or get_token() or ""},
)
atexit.register(unregister)
try:
while True:
time.sleep(0.1)
except (KeyboardInterrupt, OSError):
print(
click.style("INFO", fg="green")
+ ":\t Keyboard interruption in main thread... closing server."
)
unregister()
t.join(timeout=5)
for tunnel in CURRENT_TUNNELS:
tunnel.kill()
```
## /backend/fastrtc/templates/component/assets/worker-lPYB70QI.js
```js path="/backend/fastrtc/templates/component/assets/worker-lPYB70QI.js"
(function(){"use strict";const R="https://unpkg.com/@ffmpeg/core@0.12.6/dist/umd/ffmpeg-core.js";var E;(function(t){t.LOAD="LOAD",t.EXEC="EXEC",t.WRITE_FILE="WRITE_FILE",t.READ_FILE="READ_FILE",t.DELETE_FILE="DELETE_FILE",t.RENAME="RENAME",t.CREATE_DIR="CREATE_DIR",t.LIST_DIR="LIST_DIR",t.DELETE_DIR="DELETE_DIR",t.ERROR="ERROR",t.DOWNLOAD="DOWNLOAD",t.PROGRESS="PROGRESS",t.LOG="LOG",t.MOUNT="MOUNT",t.UNMOUNT="UNMOUNT"})(E||(E={}));const a=new Error("unknown message type"),f=new Error("ffmpeg is not loaded, call `await ffmpeg.load()` first"),u=new Error("failed to import ffmpeg-core.js");let r;const O=async({coreURL:t,wasmURL:n,workerURL:e})=>{const o=!r;try{t||(t=R),importScripts(t)}catch{if(t||(t=R.replace("/umd/","/esm/")),self.createFFmpegCore=(await import(t)).default,!self.createFFmpegCore)throw u}const s=t,c=n||t.replace(/.js$/g,".wasm"),b=e||t.replace(/.js$/g,".worker.js");return r=await self.createFFmpegCore({mainScriptUrlOrBlob:`${s}#${btoa(JSON.stringify({wasmURL:c,workerURL:b}))}`}),r.setLogger(i=>self.postMessage({type:E.LOG,data:i})),r.setProgress(i=>self.postMessage({type:E.PROGRESS,data:i})),o},l=({args:t,timeout:n=-1})=>{r.setTimeout(n),r.exec(...t);const e=r.ret;return r.reset(),e},m=({path:t,data:n})=>(r.FS.writeFile(t,n),!0),D=({path:t,encoding:n})=>r.FS.readFile(t,{encoding:n}),S=({path:t})=>(r.FS.unlink(t),!0),I=({oldPath:t,newPath:n})=>(r.FS.rename(t,n),!0),L=({path:t})=>(r.FS.mkdir(t),!0),N=({path:t})=>{const n=r.FS.readdir(t),e=[];for(const o of n){const s=r.FS.stat(`${t}/${o}`),c=r.FS.isDir(s.mode);e.push({name:o,isDir:c})}return e},A=({path:t})=>(r.FS.rmdir(t),!0),w=({fsType:t,options:n,mountPoint:e})=>{const o=t,s=r.FS.filesystems[o];return s?(r.FS.mount(s,n,e),!0):!1},k=({mountPoint:t})=>(r.FS.unmount(t),!0);self.onmessage=async({data:{id:t,type:n,data:e}})=>{const o=[];let s;try{if(n!==E.LOAD&&!r)throw f;switch(n){case E.LOAD:s=await O(e);break;case E.EXEC:s=l(e);break;case E.WRITE_FILE:s=m(e);break;case E.READ_FILE:s=D(e);break;case E.DELETE_FILE:s=S(e);break;case E.RENAME:s=I(e);break;case E.CREATE_DIR:s=L(e);break;case E.LIST_DIR:s=N(e);break;case E.DELETE_DIR:s=A(e);break;case E.MOUNT:s=w(e);break;case E.UNMOUNT:s=k(e);break;default:throw a}}catch(c){self.postMessage({id:t,type:E.ERROR,data:c.toString()});return}s instanceof Uint8Array&&o.push(s.buffer),self.postMessage({id:t,type:n,data:s},o)}})();
```
The content has been capped at 50000 tokens, and files over NaN bytes have been omitted. The user could consider applying other filters to refine the result. The better and more specific the context, the better the LLM can follow instructions. If the context seems verbose, the user can refine the filter using uithub. Thank you for using https://uithub.com - Perfect LLM context for any GitHub repo.